-
Notifications
You must be signed in to change notification settings - Fork 25
Expand file tree
/
Copy pathkubernetes_client.py
More file actions
1992 lines (1767 loc) · 92.4 KB
/
kubernetes_client.py
File metadata and controls
1992 lines (1767 loc) · 92.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""Kubernetes client for managing cluster operations and resources.""" # pylint: disable=too-many-lines
import time
from typing import Optional
import os
import uuid
import glob
import yaml
import requests
from kubernetes import client, config
from kubernetes.stream import stream
from utils.logger_config import get_logger, setup_logging
from utils.common import save_info_to_file
from utils.constants import UrlConstants
# https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/#taint-based-evictions
# https://kubernetes.io/docs/reference/labels-annotations-taints/
builtin_taints_keys = [
"node.kubernetes.io/not-ready",
"node.kubernetes.io/unreachable",
"node.kubernetes.io/pid-pressure",
"node.kubernetes.io/out-of-disk",
"node.kubernetes.io/memory-pressure",
"node.kubernetes.io/disk-pressure",
"node.kubernetes.io/network-unavailable",
"node.kubernetes.io/unschedulable",
"node.cloudprovider.kubernetes.io/uninitialized",
"node.cloudprovider.kubernetes.io/shutdown",
]
# Configure logging
setup_logging()
logger = get_logger(__name__)
class KubernetesClient:
"""Client for managing Kubernetes cluster operations and resources."""
def __init__(self, config_file=None):
self.config_file = config_file
config.load_kube_config(config_file=config_file)
self._setup_clients()
def _setup_clients(self):
"""
Initialize or reinitialize all Kubernetes API clients.
This method is used by both __init__ and set_context to create client instances.
"""
self.api = client.CoreV1Api()
self.app = client.AppsV1Api()
self.storage = client.StorageV1Api()
self.batch = client.BatchV1Api()
def get_app_client(self):
"""Get the AppsV1Api client."""
return self.app
def get_api_client(self):
"""Get the CoreV1Api client."""
return self.api
def describe_node(self, node_name):
"""Get detailed information about a specific node."""
return self.api.read_node(node_name)
def get_nodes(self, label_selector=None, field_selector=None):
"""Get a list of nodes matching the given selectors."""
return self.api.list_node(label_selector=label_selector,
field_selector=field_selector).items
def get_ready_nodes(self, label_selector=None, field_selector=None):
"""
Get a list of nodes that are ready to be scheduled. Should apply all conditions:
- 'Ready' condition status is True
- 'NetworkUnavailable' condition status is not present or is False
- Spec unschedulable is False
- Spec taints do not have any builtin taints keys with effect 'NoSchedule' or 'NoExecute'
"""
nodes = self.get_nodes(label_selector=label_selector, field_selector=field_selector)
return [
node for node in nodes
if self._is_node_schedulable(node) and self._is_node_untainted(node)
]
def _is_node_schedulable(self, node):
status_conditions = {cond.type: cond.status for cond in node.status.conditions}
is_schedulable = (
status_conditions.get("Ready") == "True"
and status_conditions.get("NetworkUnavailable") != "True"
and node.spec.unschedulable is not True
)
if not is_schedulable:
logger.info("Node NOT Ready: '%s' is not schedulable. "
"status_conditions: %s. unschedulable: %s",
node.metadata.name, status_conditions, node.spec.unschedulable)
return is_schedulable
def _is_node_untainted(self, node):
if not node.spec.taints:
return True
for taint in node.spec.taints:
if (taint.key in builtin_taints_keys and
taint.effect in ("NoSchedule", "NoExecute")):
logger.info("Node NOT Ready: '%s' has taint '%s' with effect '%s'",
node.metadata.name, taint.key, taint.effect)
return False
return True
def _is_ready_pod(self, pod):
"""Check if a pod is in Ready state."""
for condition in pod.status.conditions:
if condition.type == "Ready" and condition.status == "True":
return True
return False
def get_pods_by_namespace(self, namespace, label_selector=None, field_selector=None):
"""Get pods in a specific namespace matching the given selectors."""
return self.api.list_namespaced_pod(namespace=namespace,
label_selector=label_selector,
field_selector=field_selector).items
def get_ready_pods_by_namespace(self, namespace=None, label_selector=None, field_selector=None):
"""Get pods that are running and ready in a specific namespace."""
pods = self.get_pods_by_namespace(namespace=namespace,
label_selector=label_selector,
field_selector=field_selector)
return [pod for pod in pods if pod.status.phase == "Running" and self._is_ready_pod(pod)]
def get_persistent_volume_claims_by_namespace(self, namespace):
"""Get all persistent volume claims in a namespace."""
return self.api.list_namespaced_persistent_volume_claim(namespace=namespace).items
def get_bound_persistent_volume_claims_by_namespace(self, namespace):
"""Get all bound persistent volume claims in a namespace."""
claims = self.get_persistent_volume_claims_by_namespace(namespace=namespace)
return [claim for claim in claims if claim.status.phase == "Bound"]
def delete_persistent_volume_claim_by_namespace(self, namespace):
"""Delete all persistent volume claims in a namespace."""
pvcs = self.get_persistent_volume_claims_by_namespace(namespace=namespace)
for pvc in pvcs:
try:
self.api.delete_namespaced_persistent_volume_claim(
pvc.metadata.name, namespace, body=client.V1DeleteOptions())
except client.rest.ApiException as e:
logger.error("Error deleting PVC '%s': %s", pvc.metadata.name, e)
def get_volume_attachments(self):
"""Get all volume attachments in the cluster."""
return self.storage.list_volume_attachment().items
def get_attached_volume_attachments(self):
"""Get all attached volume attachments in the cluster."""
volume_attachments = self.get_volume_attachments()
return [attachment for attachment in volume_attachments if attachment.status.attached]
def create_namespace(self, namespace):
"""
Returns the namespace object if it exists, otherwise creates it.
"""
try:
namespace = self.api.read_namespace(namespace)
logger.info(f"Namespace '{namespace.metadata.name}' already exists.")
return namespace
except client.rest.ApiException as e:
if e.status == 404:
body = client.V1Namespace(metadata=client.V1ObjectMeta(name=namespace))
return self.api.create_namespace(body)
raise e
def delete_namespace(self, namespace):
return self.api.delete_namespace(namespace)
# TODO: Explore https://kustomize.io for templating
def create_template(self, template_path: str, replacements: dict) -> str:
"""
Generate a Kubernetes resource template by replacing placeholders with actual values.
:param template_path: Path to the YAML template file.
:param replacements: Dictionary of placeholders and their corresponding values.
:return: Processed YAML content as a string.
"""
if not os.path.isfile(template_path):
raise FileNotFoundError(f"Template file not found: {template_path}")
try:
with open(template_path, "r", encoding="utf-8") as file:
template = file.read()
for key, value in replacements.items():
template = template.replace(f"{{{{{key}}}}}", str(value))
logger.info(f"Final template: \n{template}")
return template
except Exception as e:
raise Exception(f"Error processing template file {template_path}: {str(e)}") from e
def create_node(self, template):
"""
Create a Node in the Kubernetes cluster using the provided YAML template.
:param template: YAML template for the Node.
:param namespace: Namespace where the Node will be created (not applicable for Node, but kept for consistency).
:return: Name of the created Node.
"""
try:
node_obj = yaml.safe_load(template)
if node_obj["kind"] != "Node":
raise ValueError("The provided YAML template does not define a Node resource.")
response = self.api.create_node(body=node_obj)
return response.metadata.name
except yaml.YAMLError as e:
raise Exception(f"Error parsing Node template: {str(e)}") from e
except client.rest.ApiException as e:
if e.status == 409: # Node already exists
self.api.replace_node(name=node_obj["metadata"]["name"], body=node_obj)
return node_obj["metadata"]["name"]
raise Exception(f"Error creating Node: {str(e)}") from e
def delete_node(self, node_name):
"""
Delete a Kubernetes Node resource by name.
:param node_name: Name of the Node to delete.
:return: None
"""
try:
self.api.delete_node(name=node_name, body=client.V1DeleteOptions())
logger.info(f"Node '{node_name}' deleted successfully.")
except client.rest.ApiException as e:
if e.status == 404: # Node not found
logger.info(f"Node '{node_name}' not found.")
else:
raise Exception(f"Error deleting Node '{node_name}': {str(e)}") from e
def wait_for_nodes_ready(self, node_count, operation_timeout_in_minutes, label_selector=None):
"""
Waits for a specific number of nodes with a given label to be ready within a specified timeout.
Raises an exception if the expected number of nodes are not ready within the timeout.
:param node_label: The label to filter nodes.
:param node_count: The expected number of nodes to be ready.
:param operation_timeout_in_minutes: The timeout in minutes to wait for the nodes to be ready.
:return: None
"""
ready_nodes = []
ready_node_count = 0
timeout = time.time() + (operation_timeout_in_minutes * 60)
logger.info(f"Validating {node_count} nodes with label {label_selector} are ready.")
while time.time() < timeout:
ready_nodes = self.get_ready_nodes(label_selector=label_selector)
ready_node_count = len(ready_nodes)
logger.info(f"Currently {ready_node_count} nodes are ready.")
if ready_node_count == node_count:
return ready_nodes
logger.info(f"Waiting for {node_count} nodes to be ready.")
time.sleep(10)
raise Exception(f"Only {ready_node_count} nodes are ready, expected {node_count} nodes!")
def wait_for_pods_ready(self, operation_timeout_in_minutes, namespace="default", pod_count=None, label_selector=None):
"""
Waits for a specific number of pods with a given label to be ready within a specified timeout.
Raises an exception if the expected number of pods are not ready within the timeout.
:param label_selector: The label to filter pods.
:param operation_timeout_in_minutes: The timeout in minutes to wait for the pods to be ready.
:param pod_count: The expected number of pods to be ready. If not provided, it will dynamically fetch the count of pods with the specified label on each iteration.
:param namespace: The namespace to filter pods.
:return: List of ready pods
"""
pods = []
timeout = time.time() + (operation_timeout_in_minutes * 60)
# If pod_count is provided, use it for logging
if pod_count is not None:
logger.info(f"Validating {pod_count} pods with label {label_selector} are ready.")
else:
logger.info(f"Validating all pods with label {label_selector} are ready (dynamic count).")
while time.time() < timeout:
# Get current expected pod count
current_pod_count = pod_count
if current_pod_count is None:
labelled_pods = self.get_pods_by_namespace(
namespace=namespace, label_selector=label_selector
)
current_pod_count = len(labelled_pods)
if current_pod_count == 0:
raise Exception(f"No pods found with selector '{label_selector}' in namespace '{namespace}'")
pods = self.get_ready_pods_by_namespace(namespace=namespace, label_selector=label_selector)
if len(pods) == current_pod_count:
return pods
logger.info(f"Waiting for {current_pod_count} pods to be ready. Currently {len(pods)} pods are ready.")
time.sleep(10)
# Final count for error message
final_expected_count = pod_count
if final_expected_count is None:
labelled_pods = self.get_pods_by_namespace(
namespace=namespace, label_selector=label_selector
)
final_expected_count = len(labelled_pods)
raise Exception(f"Only {len(pods)} pods are ready, expected {final_expected_count} pods!")
def wait_for_labeled_pods_ready(self, label_selector: str, namespace: str = "default", timeout_in_minutes: int = 5) -> None:
"""
Wait for all pods with specific label to be ready
Args:
selector: Label selector for the pods
namespace: Namespace where pods exist
timeout: Timeout string (e.g., "300s", "5m")
"""
pods = self.get_pods_by_namespace(
namespace=namespace, label_selector=label_selector
)
pod_count = len(pods)
if pod_count == 0:
raise Exception(f"No pods found with selector '{label_selector}' in namespace '{namespace}'")
self.wait_for_pods_ready(
pod_count=pod_count,
operation_timeout_in_minutes=timeout_in_minutes,
namespace=namespace,
label_selector=label_selector,
)
def wait_for_pods_completed(self, label_selector, namespace="default", timeout=300, pod_count=None):
"""
Waits for pods with a specific label to complete successfully their execution within a specified timeout.
Raises an exception if the pods do not complete within the timeout.
:param label_selector: The label selector to filter pods.
:param namespace: The namespace where the pod is located (default: "default").
:param timeout: The timeout in seconds to wait for the pod to complete (default: 300 seconds).
:return: None
"""
# If pod_count is provided, use it for logging
if pod_count is not None:
logger.info(f"Waiting for {pod_count} pod(s) with label {label_selector}in namespace '{namespace}' to complete")
else:
logger.info(f"Waiting for pods with label '{label_selector}' in namespace '{namespace}' to complete")
start_time = time.time()
while time.time() - start_time < timeout:
pods = self.get_pods_by_namespace(
namespace=namespace, label_selector=label_selector
)
if not pods:
raise Exception(f"No pods found with selector '{label_selector}' in namespace '{namespace}'")
current_pod_count = pod_count
if current_pod_count is None:
current_pod_count = len(pods)
completed_pods = []
for pod in pods:
logger.info(f"Pod '{pod.metadata.name}' status: {pod.status.phase}")
if pod.status.phase == "Succeeded":
completed_pods.append(pod)
if len(completed_pods) == current_pod_count:
return completed_pods
sleep_time = 10
logger.info(f"Waiting for {sleep_time} seconds before checking pod status again.")
time.sleep(sleep_time)
raise Exception(
f"Pods with label '{label_selector}' in namespace '{namespace}' did not complete within {timeout} seconds."
)
def wait_for_job_completed(self, job_name, namespace="default", timeout=300):
"""
Waits for a specific job to complete its execution within a specified timeout.
Raises an exception if the job does not complete within the timeout.
:param job_name: The name of the job to wait for.
:param namespace: The namespace where the job is located (default: "default").
:param timeout: The timeout in seconds to wait for the job to complete (default: 300 seconds).
:return: None
"""
start_time = time.time()
while time.time() - start_time < timeout:
try:
job = self.batch.read_namespaced_job(name=job_name, namespace=namespace)
if job.status.succeeded is not None and job.status.succeeded > 0:
logger.info(
f"Job '{job_name}' in namespace '{namespace}' has completed successfully."
)
return job.metadata.name
if job.status.failed is not None and job.status.failed > 0:
raise Exception(
f"Job '{job_name}' in namespace '{namespace}' has failed."
)
logger.info(
f"Job '{job_name}' in namespace '{namespace}' is still running with status:\n{job.status}"
)
except client.rest.ApiException as e:
if e.status == 404:
raise Exception(
f"Job '{job_name}' not found in namespace '{namespace}'."
) from e
raise e
sleep_time = 30
logger.info(
f"Waiting {sleep_time} seconds before checking job status again."
)
time.sleep(sleep_time)
raise Exception(
f"Job '{job_name}' in namespace '{namespace}' did not complete within {timeout} seconds."
)
def get_pod_logs(self, pod_name, namespace="default", container=None, tail_lines=None):
"""
Get logs from a specific pod in the given namespace.
:param pod_name: Name of the pod
:param namespace: Namespace where the pod is located (default: "default")
:param container: Container name if pod has multiple containers (optional)
:param tail_lines: Number of lines to return from the end of the logs (optional)
:return: String containing the pod logs
"""
try:
return self.api.read_namespaced_pod_log(
name=pod_name,
namespace=namespace,
container=container,
tail_lines=tail_lines,
_preload_content=False # Avoid breaking data format
).data
except client.rest.ApiException as e:
raise Exception(f"Error getting logs for pod '{pod_name}' in namespace '{namespace}': {str(e)}") from e
def run_pod_exec_command(self, pod_name: str, command: str, container_name: str = "", dest_path: str = "", namespace: str = "default") -> str:
"""
Executes a command in a specified container within a Kubernetes pod and optionally saves the output to a file.
Args:
pod_name (str): The name of the pod where the command will be executed.
container_name (str): The name of the container within the pod where the command will be executed.
command (str): The command to be executed in the container.
dest_path (str, optional): The file path where the command output will be saved. Defaults to "".
namespace (str, optional): The Kubernetes namespace where the pod is located. Defaults to "default".
Returns:
str: The combined standard output of the executed command.
Raises:
Exception: If an error occurs while executing the command in the pod.
"""
commands = ['/bin/sh', '-c', command]
logger.info(
f"Executing command in pod '{pod_name}' in namespace '{namespace}': {' '.join(commands)}"
)
# Build kwargs conditionally
stream_kwargs = {
"name": pod_name,
"namespace": namespace,
"command": commands,
"stderr": True,
"stdin": False,
"stdout": True,
"tty": False,
"_preload_content": False,
}
# Only include container if container_name is provided
if container_name:
stream_kwargs["container"] = container_name
resp = stream(self.api.connect_get_namespaced_pod_exec, **stream_kwargs)
res = []
file = None
if dest_path:
file = open(dest_path, 'wb') # pylint: disable=consider-using-with
try:
while resp.is_open():
resp.update(timeout=1)
if resp.peek_stdout():
stdout = resp.read_stdout()
res.append(stdout)
clean_stdout = stdout.rstrip('\n\r')
logger.info("STDOUT: %s", clean_stdout)
if file:
file.write(stdout.encode('utf-8'))
logger.info("Saved response to file: %s", dest_path)
if resp.peek_stderr():
error_msg = resp.read_stderr()
raise Exception(f"Error occurred while executing command in pod: {error_msg}")
finally:
resp.close()
if file is not None:
file.close()
return ''.join(res)
def get_daemonsets_pods_allocated_resources(self, namespace, node_name):
"""Get CPU and memory resources allocated by DaemonSet pods on a specific node."""
pods = self.get_pods_by_namespace(namespace=namespace, field_selector=f"spec.nodeName={node_name}")
cpu_request = 0
memory_request = 0
for pod in pods:
for container in pod.spec.containers:
if container.resources.requests:
logger.info("Pod %s has container %s with resources %s",
pod.metadata.name, container.name, container.resources.requests)
cpu_request += int(container.resources.requests.get("cpu", "0m").replace("m", ""))
memory_request += int(container.resources.requests.get("memory", "0Mi").replace("Mi", ""))
return cpu_request, memory_request * 1024 # Convert to KiB
def get_daemonsets_pods_count(self, namespace, node_name):
"""
Get the count of DaemonSet pods running on a specific node.
Args:
namespace (str): The namespace where the DaemonSet is located.
node_name (str): The name of the node where the pods are running.
Returns:
int: The count of DaemonSet pods running on a specific node.
"""
pods = self.get_pods_by_namespace(namespace=namespace, field_selector=f"spec.nodeName={node_name}")
return len(pods)
def set_context(self, context_name):
"""
Switch to the specified Kubernetes context and reinitialize all API clients.
Args:
context_name (str): Name of the Kubernetes context to switch to
Returns:
None
Raises:
Exception: If the context switch fails
"""
try:
config.load_kube_config(
config_file=self.config_file, context=context_name)
self._setup_clients()
logger.info(f"Successfully switched to context: {context_name}")
except Exception as e:
raise Exception(f"Failed to switch to context {context_name}: {e}") from e
def get_pods_name_and_ip(self, label_selector="", namespace="default"):
"""
Retrieve the name and IP address of all pods matching the given label selector and namespace.
Args:
label_selector (str, optional): The label selector to filter pods. Defaults to an empty string.
namespace (str, optional): The namespace to search for pods. Defaults to "default".
Returns:
list: A list of dictionaries containing the name and IP address of each matching pod.
"""
pods = self.get_pods_by_namespace(
namespace=namespace, label_selector=label_selector)
return [{"name": pod.metadata.name, "ip": pod.status.pod_ip, "node_ip": pod.status.host_ip} for pod in pods]
def get_pod_name_and_ip(self, label_selector="", namespace="default"):
"""
Retrieve the name and IP address of the first pod matching the given label selector and namespace.
Args:
label_selector (str, optional): The label selector to filter pods. Defaults to an empty string.
namespace (str, optional): The namespace to search for pods. Defaults to "default".
Returns:
tuple: A tuple containing the name and IP address of the first matching pod.
Raises:
Exception: If no pods are found matching the given label selector and namespace.
"""
pods = self.get_pods_name_and_ip(
namespace=namespace, label_selector=label_selector)
logger.info(pods)
if not pods:
raise Exception(
f"No pod found with label: {label_selector} and namespace: {namespace}")
return pods[0]
def get_service_external_ip(self, service_name, namespace="default"):
"""
Get the external IP address of a service.
"""
service = self.api.read_namespaced_service(service_name, namespace)
if service.status.load_balancer.ingress:
return service.status.load_balancer.ingress[0].ip
return None
def get_pod_details(self, namespace="default", label_selector=""):
"""
Get detailed info about pods in a namespace
"""
pods = self.get_pods_by_namespace(
namespace=namespace, label_selector=label_selector)
pod_details = []
for pod in pods:
pod_details.append({
"name": pod.metadata.name,
"labels": pod.metadata.labels,
"node_name": pod.spec.node_name,
"ip": pod.status.pod_ip,
"status": pod.status.phase,
"spec": pod.spec.to_dict(),
})
return pod_details
def get_node_details(self, node_name):
"""
Get detailed info about a node
"""
node = self.api.read_node(node_name)
if not node:
raise Exception(f"Node '{node_name}' not found.")
labels = node.metadata.labels
node_details = {
"name": node.metadata.name,
"labels": labels,
"region": labels.get("topology.kubernetes.io/region", "Unknown"),
"zone": labels.get("topology.kubernetes.io/zone", "Unknown"),
"instance_type": labels.get("node.kubernetes.io/instance-type", "Unknown"),
"allocatable": node.status.allocatable,
"capacity": node.status.capacity,
"node_info": node.status.node_info.to_dict(),
}
return node_details
def collect_pod_and_node_info(self, namespace="default", label_selector="", result_dir="", role=""):
"""
Collect information about all pods and their respective nodes.
The result will have pod information under 'pod' key and node information under 'node' key
to prevent any naming conflicts.
"""
pods = self.get_pod_details(
namespace=namespace, label_selector=label_selector)
logger.info(
f"Inside collect_pod_and_node_info, The pods details are: {pods}")
node_cache = {}
pods_and_nodes = []
for pod in pods:
node_name = pod["node_name"]
logger.info(
f"Inside collect_pod_and_node_info, The node_name details are: {node_name}")
if node_name not in node_cache:
node_cache[node_name] = self.get_node_details(
node_name=node_name)
node_info = node_cache[node_name]
logger.info(
f"Inside collect_pod_and_node_info, The node_info details are: {node_info}")
pod_and_node_info = {
"pod": pod,
"node": node_info
}
logger.info(
f"Inside collect_pod_and_node_info, The pod_and_node_info details are: {pod_and_node_info}")
pods_and_nodes.append(pod_and_node_info)
# Save results
file_name = os.path.join(result_dir, f"{role}_pod_node_info.json")
logger.info(
f"Inside collect_pod_and_node_info, The file_name details are: {file_name}")
save_info_to_file(pods_and_nodes, file_name)
def verify_nvidia_smi_on_node(self, nodes, namespace="default"):
"""
Create a pod on the specific node and run nvidia-smi to verify GPU access
Args:
nodes: List of nodes to verify
namespace: Namespace to create the pod in (default: "default")
Returns:
True if nvidia-smi command succeeds, False otherwise
"""
try:
all_pod_logs = {}
for node in nodes:
pod_name = f"gpu-verify-{uuid.uuid4()}"
node_name = node.metadata.name
logger.info(f"Verifying NVIDIA drivers on node {node_name}")
node = self.describe_node(node_name)
# Check if the node has GPUs allocated values
start_time = time.time()
while "nvidia.com/gpu" not in node.status.allocatable and time.time() < start_time + 600:
node = self.describe_node(node_name)
logger.info(f"Node allocatable resources: {node.status.allocatable}")
logger.info(f"Waiting for GPUs to be allocated on node {node_name}...")
time.sleep(1)
gpu_count = int(node.status.allocatable.get("nvidia.com/gpu", "0"))
logger.info(f"Node {node_name} has {gpu_count} GPUs, requesting all for validation")
# Skip nodes with no GPUs
if gpu_count == 0:
logger.warning(f"Skipping node {node_name} as it has no GPUs")
continue
# Create pod spec with node selector
pod = client.V1Pod(
metadata=client.V1ObjectMeta(name=pod_name),
spec=client.V1PodSpec(
containers=[
client.V1Container(
name="nvidia-test",
image="nvidia/cuda:12.2.0-base-ubuntu20.04",
command=["/bin/bash", "-c", "nvidia-smi"],
resources=client.V1ResourceRequirements(
limits={"nvidia.com/gpu": str(gpu_count)}
),
)
],
node_selector={"kubernetes.io/hostname": node_name},
restart_policy="Never",
tolerations=[
client.V1Toleration(
key="nvidia.com/gpu",
operator="Exists",
effect="NoSchedule",
)
],
),
)
# Create the pod
logger.info(f"Creating test pod {pod_name} on node {node_name}")
self.api.create_namespaced_pod(namespace=namespace, body=pod)
# Wait for pod to complete
timeout = time.time() + 120 # 2 minutes timeout
while time.time() < timeout:
pod_status = self.api.read_namespaced_pod(
name=pod_name, namespace=namespace
)
if pod_status.status.phase in ["Succeeded", "Failed"]:
break
time.sleep(2)
# Get pod logs
pod_logs = self.get_pod_logs(pod_name=pod_name, namespace=namespace)
if isinstance(pod_logs, bytes):
pod_logs_str = pod_logs.decode('utf-8')
else:
pod_logs_str = str(pod_logs)
logger.info(f"nvidia-smi output: {pod_logs_str}")
# Check if output contains expected NVIDIA information
if "NVIDIA-SMI" in pod_logs_str and "GPU" in pod_logs_str:
logger.info(f"NVIDIA drivers verified on node {node_name}")
verification_successful = True
else:
logger.warning(
f"nvidia-smi output does not contain expected NVIDIA information on node {node_name}"
)
verification_successful = False
all_pod_logs[node_name] = {
"pod_name": pod_name,
"logs": pod_logs_str,
"device_status": verification_successful,
}
# Clean up the test pod
try:
logger.info(f"Deleting test pod {pod_name}")
self.api.delete_namespaced_pod(
name=pod_name,
namespace=namespace,
body=client.V1DeleteOptions(),
)
except Exception as e:
logger.warning(f"Error deleting test pod {pod_name}: {str(e)}")
return all_pod_logs
except Exception as e:
logger.error(
f"Error verifying NVIDIA drivers: {str(e)}"
)
return False
def apply_manifest_from_url(self, manifest_url, namespace: Optional[str] = None):
"""
Apply a Kubernetes manifest from a URL using Kubernetes Python client API.
:param manifest_url: URL of the manifest to apply
:param namespace: Optional namespace to override the manifest namespace
:return: None
"""
try:
# Fetch the manifest content from the URL
response = requests.get(manifest_url, timeout=30)
response.raise_for_status()
# Parse YAML content (can contain multiple documents)
manifests = list(yaml.safe_load_all(response.text))
# Validate and expand manifests (handles List kind and non-dict manifests)
expanded_manifests = self._expand_and_validate_manifests(manifests)
for manifest in expanded_manifests:
self._apply_single_manifest(manifest, namespace=namespace)
logger.info("Successfully applied manifest from %s", manifest_url)
except Exception as e:
raise Exception(f"Error applying manifest from {manifest_url}: {str(e)}") from e
def delete_manifest_from_url(self, manifest_url, ignore_not_found: bool = True, namespace: Optional[str] = None):
"""
Delete a Kubernetes manifest from a URL using Kubernetes Python client API.
Equivalent to 'kubectl delete -f <url>'
:param manifest_url: URL of the manifest to delete
:param ignore_not_found: If True, don't raise error if resource doesn't exist (equivalent to --ignore-not-found)
:param namespace: Optional namespace to override the manifest namespace
:return: None
"""
try:
# Fetch the manifest content from the URL
response = requests.get(manifest_url, timeout=30)
response.raise_for_status()
# Parse YAML content (can contain multiple documents)
manifests = list(yaml.safe_load_all(response.text))
# Validate and expand manifests (handles List kind and non-dict manifests)
expanded_manifests = self._expand_and_validate_manifests(manifests)
# Delete manifests in reverse order (to handle dependencies)
expanded_manifests.reverse()
for manifest in expanded_manifests:
self._delete_single_manifest(manifest, ignore_not_found=ignore_not_found, namespace=namespace)
logger.info("Successfully deleted manifest from %s", manifest_url)
except Exception as e:
raise Exception(f"Error deleting manifest from {manifest_url}: {str(e)}") from e
def _load_manifests_from_sources(self, manifest_path: str = None, manifest_dict: dict = None):
"""
Load manifests from various sources (file, directory, or dictionary).
:param manifest_path: Path to YAML manifest file or folder containing manifest files
:param manifest_dict: Dictionary containing the manifest
:return: Tuple of (manifests_list, sources_list)
"""
manifests = []
sources = []
# Load manifests from file or directory
if manifest_path:
if os.path.isfile(manifest_path):
# Single file
with open(manifest_path, 'r', encoding='utf-8') as file:
content = file.read()
# Handle multiple documents in a single YAML file
yaml_docs = list(yaml.safe_load_all(content))
manifests.extend([doc for doc in yaml_docs if doc]) # Filter out None/empty docs
sources.append(f"file: {manifest_path}")
elif os.path.isdir(manifest_path):
# Directory containing manifest files
yaml_files = []
for ext in ['*.yaml', '*.yml']:
# Use recursive search which will include all files
yaml_files.extend(glob.glob(os.path.join(manifest_path, '**', ext), recursive=True))
# Remove duplicates and sort files to ensure consistent ordering
yaml_files = sorted(list(set(yaml_files)))
if not yaml_files:
raise ValueError(f"No YAML files found in directory: {manifest_path}")
for yaml_file in yaml_files:
with open(yaml_file, 'r', encoding='utf-8') as file:
content = file.read()
# Handle multiple documents in each YAML file
yaml_docs = list(yaml.safe_load_all(content))
manifests.extend([doc for doc in yaml_docs if doc]) # Filter out None/empty docs
sources.append(f"directory: {manifest_path} ({len(yaml_files)} files)")
else:
raise FileNotFoundError(f"Path does not exist: {manifest_path}")
# Load manifest from dictionary
if manifest_dict:
manifests.append(manifest_dict)
sources.append("dictionary")
if not manifests:
raise ValueError("At least one of manifest_path or manifest_dict must be provided")
return manifests, sources
def apply_manifest_from_file(self, manifest_path: str = None, manifest_dict: dict = None, namespace: Optional[str] = None):
"""
Apply Kubernetes manifest(s) from file path, folder path, or dictionary.
:param manifest_path: Path to YAML manifest file or folder containing manifest files
:param manifest_dict: Dictionary containing the manifest
:param namespace: Optional namespace to override the manifest namespace
:return: None
"""
try:
# Load manifests from various sources
manifests_to_apply, applied_sources = self._load_manifests_from_sources(manifest_path, manifest_dict)
# Validate and expand manifests (handles List kind and non-dict manifests)
manifests_to_apply = self._expand_and_validate_manifests(manifests_to_apply)
# Apply all manifests
namespace_info = f" in namespace '{namespace}'" if namespace else ""
logger.info(f"Applying {len(manifests_to_apply)} manifest(s) from: {', '.join(applied_sources)}{namespace_info}")
for i, manifest in enumerate(manifests_to_apply):
logger.info(f"Applying manifest {i+1}/{len(manifests_to_apply)}: {manifest.get('kind', 'Unknown')}/{manifest.get('metadata', {}).get('name', 'Unknown')}")
self._apply_single_manifest(manifest=manifest, namespace=namespace)
logger.info(f"Successfully applied {len(manifests_to_apply)} manifest(s)")
except Exception as e:
logger.error(f"Error applying manifest(s): {str(e)}")
raise e
def delete_manifest_from_file(self, manifest_path: str = None, manifest_dict: dict = None, ignore_not_found: bool = True, namespace: Optional[str] = None):
"""
Delete Kubernetes manifest(s) from file path, folder path, or dictionary.
Equivalent to 'kubectl delete -f <file/folder>'
:param manifest_path: Path to YAML manifest file or folder containing manifest files
:param manifest_dict: Dictionary containing the manifest
:param ignore_not_found: If True, don't raise error if resource doesn't exist (equivalent to --ignore-not-found)
:param namespace: Optional namespace to override the manifest namespace
:return: None
"""
try:
# Load manifests from various sources
manifests_to_delete, deleted_sources = self._load_manifests_from_sources(manifest_path, manifest_dict)
# Validate and expand manifests (handles List kind and non-dict manifests)
manifests_to_delete = self._expand_and_validate_manifests(manifests_to_delete)
# Delete all manifests in reverse order (to handle dependencies)
manifests_to_delete.reverse()
namespace_info = f" in namespace '{namespace}'" if namespace else ""
logger.info(f"Deleting {len(manifests_to_delete)} manifest(s) from: {', '.join(deleted_sources)}{namespace_info}")
for i, manifest in enumerate(manifests_to_delete):
logger.info(f"Deleting manifest {i+1}/{len(manifests_to_delete)}: {manifest.get('kind', 'Unknown')}/{manifest.get('metadata', {}).get('name', 'Unknown')}")
self._delete_single_manifest(manifest=manifest, ignore_not_found=ignore_not_found, namespace=namespace)
logger.info(f"Successfully deleted {len(manifests_to_delete)} manifest(s)")
except Exception as e:
logger.error(f"Error deleting manifest(s): {str(e)}")
raise e
def wait_for_condition(self, resource_type: str, wait_condition_type: str, namespace: str = "default",
timeout_seconds: int = 300, resource_name: str = None, wait_all: bool = False):
"""
Wait for a Kubernetes resource to meet a specific condition.
Equivalent to 'kubectl wait --for=condition=<wait_condition_type> <resource> --timeout=<timeout> -n <namespace>'
:param resource_type: Type of resource (e.g., 'deployment', 'pod', 'service')
:param wait_condition_type: Condition type to wait for (e.g., 'available', 'ready', 'progressing')
:param namespace: Namespace where the resource is located
:param timeout_seconds: Maximum time to wait in seconds
:param resource_name: Name of specific resource (None to wait for all)
:param wait_all: If True, wait for all resources of the type (equivalent to --all flag)
:return: True if condition is met, False if timeout
:raises ValueError: If wait_condition_type is invalid
"""
# Define valid condition types for different resource types
valid_conditions = {
'deployment': ['available', 'progressing', 'replicafailure', 'ready'],
'deployments': ['available', 'progressing', 'replicafailure', 'ready'],
'job': ['complete', 'failed'],
'jobs': ['complete', 'failed'],
# Add more resource types as needed
}
# Validate wait_condition_type format and type
if not wait_condition_type or not isinstance(wait_condition_type, str):
raise ValueError("wait_condition_type must be a non-empty string")
wait_condition_lower = wait_condition_type.lower().strip()
resource_type_lower = resource_type.lower()
# Check if resource type is supported
if resource_type_lower not in valid_conditions:
raise ValueError(f"Resource type '{resource_type}' is not supported for condition checking")
# Check if condition type is valid for this resource type
if wait_condition_lower not in valid_conditions[resource_type_lower]:
valid_conditions_str = ', '.join(valid_conditions[resource_type_lower])
raise ValueError(f"Invalid condition '{wait_condition_type}' for resource type '{resource_type}'. Valid conditions: {valid_conditions_str}")
try:
start_time = time.time()