@@ -3,11 +3,14 @@ package killer
33import (
44 "context"
55 "encoding/json"
6+ "fmt"
7+ "time"
68
79 "github.com/rs/zerolog/log"
810
911 "github.com/p-program/kube-killer/core"
1012 v1 "k8s.io/api/core/v1"
13+ policyv1 "k8s.io/api/policy/v1"
1114 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1215 "k8s.io/apimachinery/pkg/types"
1316 "k8s.io/apimachinery/pkg/util/strategicpatch"
@@ -54,78 +57,211 @@ func (k *NodeKiller) Kill() error {
5457 if err != nil {
5558 panic (err .Error ())
5659 }
60+ ctx := context .TODO ()
61+
5762 if ! k .mafia {
58- //kubectl cordon $node
63+ // Step 1: kubectl cordon $node - Mark node as unschedulable
5964 log .Info ().Msgf ("kubectl cordon %s" , k .nodeName )
60- getOption := metav1.GetOptions {}
61- node , err := clientset .CoreV1 ().Nodes ().Get (context .TODO (), k .nodeName , getOption )
62- if err != nil {
63- return err
64- }
65- oldData , err := json .Marshal (node )
66- if err != nil {
67- return err
68- }
69- node .Spec .Unschedulable = true
70- newData , err := json .Marshal (node )
65+ err = k .cordonNode (clientset , ctx )
7166 if err != nil {
72- return err
67+ return fmt . Errorf ( "failed to cordon node %s: %w" , k . nodeName , err )
7368 }
74- patchBytes , patchErr := strategicpatch .CreateTwoWayMergePatch (oldData , newData , node )
75- if patchErr != nil {
76- return patchErr
77- }
78- patchOptions := metav1.PatchOptions {}
79- if k .dryRun {
80- patchOptions .DryRun = []string {metav1 .DryRunAll }
81- }
82- _ , err = clientset .CoreV1 ().Nodes ().Patch (context .TODO (), k .nodeName , types .StrategicMergePatchType , patchBytes , patchOptions )
83- if err != nil {
84- return err
85- }
86- // Drain all pods from the node except DaemonSet pods
87- // Similar to: kubectl drain $node --ignore-daemonsets
88- err = k .drainNodePods (clientset )
69+
70+ // Step 2: kubectl drain $node --ignore-daemonsets - Evict all pods except DaemonSet pods
71+ log .Info ().Msgf ("kubectl drain %s --ignore-daemonsets" , k .nodeName )
72+ err = k .drainNodePods (clientset , ctx )
8973 if err != nil {
90- return err
74+ return fmt . Errorf ( "failed to drain node %s: %w" , k . nodeName , err )
9175 }
9276 }
93- //kubectl delete $node
77+
78+ // Step 3: kubectl delete $node - Delete the node
79+ log .Info ().Msgf ("kubectl delete node %s" , k .nodeName )
80+ err = k .deleteNode (clientset , ctx )
81+ if err != nil {
82+ return fmt .Errorf ("failed to delete node %s: %w" , k .nodeName , err )
83+ }
84+
85+ log .Info ().Msgf ("Successfully completed node deletion process for %s" , k .nodeName )
9486 return nil
9587}
9688
89+ // cordonNode marks the node as unschedulable
90+ func (k * NodeKiller ) cordonNode (clientset * kubernetes.Clientset , ctx context.Context ) error {
91+ getOption := metav1.GetOptions {}
92+ node , err := clientset .CoreV1 ().Nodes ().Get (ctx , k .nodeName , getOption )
93+ if err != nil {
94+ return err
95+ }
96+
97+ // Check if already cordoned
98+ if node .Spec .Unschedulable {
99+ log .Info ().Msgf ("Node %s is already cordoned" , k .nodeName )
100+ return nil
101+ }
102+
103+ oldData , err := json .Marshal (node )
104+ if err != nil {
105+ return err
106+ }
107+ node .Spec .Unschedulable = true
108+ newData , err := json .Marshal (node )
109+ if err != nil {
110+ return err
111+ }
112+
113+ patchBytes , patchErr := strategicpatch .CreateTwoWayMergePatch (oldData , newData , node )
114+ if patchErr != nil {
115+ return patchErr
116+ }
117+
118+ patchOptions := metav1.PatchOptions {}
119+ if k .dryRun {
120+ patchOptions .DryRun = []string {metav1 .DryRunAll }
121+ log .Info ().Msgf ("[DRY RUN] Would cordon node %s" , k .nodeName )
122+ return nil
123+ }
124+
125+ _ , err = clientset .CoreV1 ().Nodes ().Patch (ctx , k .nodeName , types .StrategicMergePatchType , patchBytes , patchOptions )
126+ return err
127+ }
128+
97129// drainNodePods evicts all pods from the node except DaemonSet pods
98130// Similar to: kubectl drain $node --ignore-daemonsets
99- func (k * NodeKiller ) drainNodePods (clientset * kubernetes.Clientset ) error {
131+ func (k * NodeKiller ) drainNodePods (clientset * kubernetes.Clientset , ctx context. Context ) error {
100132 log .Info ().Msgf ("Draining pods from node %s (ignoring DaemonSet pods)" , k .nodeName )
101133
102134 // Get all pods on this node
103135 fieldSelector := "spec.nodeName=" + k .nodeName
104- pods , err := clientset .CoreV1 ().Pods ("" ).List (context . TODO () , metav1.ListOptions {
136+ pods , err := clientset .CoreV1 ().Pods ("" ).List (ctx , metav1.ListOptions {
105137 FieldSelector : fieldSelector ,
106138 })
107139 if err != nil {
108140 return err
109141 }
110142
111- evictedCount := 0
143+ if len (pods .Items ) == 0 {
144+ log .Info ().Msgf ("No pods found on node %s" , k .nodeName )
145+ return nil
146+ }
147+
148+ // First pass: Evict all non-DaemonSet pods
149+ evictedPods := make ([]v1.Pod , 0 )
112150 for _ , pod := range pods .Items {
113151 // Check if pod belongs to a DaemonSet
114152 if k .isDaemonSetPod (pod ) {
115153 log .Info ().Msgf ("Skipping DaemonSet pod %s/%s" , pod .Namespace , pod .Name )
116154 continue
117155 }
118156
157+ // Skip pods that are already terminating
158+ if pod .DeletionTimestamp != nil {
159+ log .Info ().Msgf ("Pod %s/%s is already terminating, skipping" , pod .Namespace , pod .Name )
160+ evictedPods = append (evictedPods , pod )
161+ continue
162+ }
163+
119164 log .Info ().Msgf ("Evicting pod %s/%s from node %s" , pod .Namespace , pod .Name , k .nodeName )
120- err = clientset . CoreV1 (). Pods ( pod . Namespace ). Delete ( context . TODO (), pod . Name , k . deleteOption )
165+ err = k . evictPod ( clientset , ctx , pod )
121166 if err != nil {
122167 log .Error ().Err (err ).Msgf ("Failed to evict pod %s/%s" , pod .Namespace , pod .Name )
168+ // Continue with other pods even if one fails
123169 continue
124170 }
125- evictedCount ++
171+ evictedPods = append (evictedPods , pod )
172+ }
173+
174+ // Second pass: Wait for all evicted pods to terminate
175+ if len (evictedPods ) > 0 {
176+ log .Info ().Msgf ("Waiting for %d pods to terminate on node %s" , len (evictedPods ), k .nodeName )
177+ err = k .waitForPodsToTerminate (clientset , ctx , evictedPods )
178+ if err != nil {
179+ log .Warn ().Err (err ).Msgf ("Some pods may not have terminated gracefully" )
180+ }
181+ }
182+
183+ log .Info ().Msgf ("Successfully evicted %d pods from node %s" , len (evictedPods ), k .nodeName )
184+ return nil
185+ }
186+
187+ // evictPod evicts a pod using the Eviction API
188+ func (k * NodeKiller ) evictPod (clientset * kubernetes.Clientset , ctx context.Context , pod v1.Pod ) error {
189+ eviction := & policyv1.Eviction {
190+ ObjectMeta : metav1.ObjectMeta {
191+ Name : pod .Name ,
192+ Namespace : pod .Namespace ,
193+ },
194+ }
195+
196+ if k .dryRun {
197+ log .Info ().Msgf ("[DRY RUN] Would evict pod %s/%s" , pod .Namespace , pod .Name )
198+ return nil
199+ }
200+
201+ // Use PolicyV1 Evictions API which respects PodDisruptionBudgets and allows graceful termination
202+ err := clientset .PolicyV1 ().Evictions (pod .Namespace ).Evict (ctx , eviction )
203+ if err != nil {
204+ return fmt .Errorf ("failed to evict pod %s/%s: %w" , pod .Namespace , pod .Name , err )
205+ }
206+
207+ return nil
208+ }
209+
210+ // waitForPodsToTerminate waits for all pods to be terminated
211+ func (k * NodeKiller ) waitForPodsToTerminate (clientset * kubernetes.Clientset , ctx context.Context , pods []v1.Pod ) error {
212+ timeout := 5 * time .Minute
213+ interval := 5 * time .Second
214+
215+ startTime := time .Now ()
216+ for time .Since (startTime ) < timeout {
217+ allTerminated := true
218+ remainingPods := 0
219+
220+ for _ , pod := range pods {
221+ // Check if pod still exists
222+ currentPod , err := clientset .CoreV1 ().Pods (pod .Namespace ).Get (ctx , pod .Name , metav1.GetOptions {})
223+ if err != nil {
224+ // Pod doesn't exist anymore, consider it terminated
225+ continue
226+ }
227+
228+ // Check if pod is terminated
229+ if currentPod .DeletionTimestamp == nil && currentPod .Status .Phase != v1 .PodSucceeded && currentPod .Status .Phase != v1 .PodFailed {
230+ allTerminated = false
231+ remainingPods ++
232+ }
233+ }
234+
235+ if allTerminated {
236+ log .Info ().Msgf ("All pods have been terminated on node %s" , k .nodeName )
237+ return nil
238+ }
239+
240+ if remainingPods > 0 {
241+ log .Info ().Msgf ("Waiting for %d pods to terminate on node %s..." , remainingPods , k .nodeName )
242+ }
243+
244+ time .Sleep (interval )
245+ }
246+
247+ return fmt .Errorf ("timeout waiting for pods to terminate on node %s" , k .nodeName )
248+ }
249+
250+ // deleteNode deletes the node from the cluster
251+ func (k * NodeKiller ) deleteNode (clientset * kubernetes.Clientset , ctx context.Context ) error {
252+ deleteOptions := metav1.DeleteOptions {}
253+ if k .dryRun {
254+ deleteOptions .DryRun = []string {metav1 .DryRunAll }
255+ log .Info ().Msgf ("[DRY RUN] Would delete node %s" , k .nodeName )
256+ return nil
257+ }
258+
259+ err := clientset .CoreV1 ().Nodes ().Delete (ctx , k .nodeName , deleteOptions )
260+ if err != nil {
261+ return fmt .Errorf ("failed to delete node %s: %w" , k .nodeName , err )
126262 }
127263
128- log .Info ().Msgf ("Evicted %d pods from node %s" , evictedCount , k .nodeName )
264+ log .Info ().Msgf ("Node %s deletion initiated" , k .nodeName )
129265 return nil
130266}
131267
0 commit comments