Skip to content

Commit 667faf9

Browse files
committed
fix: make WaitUntilNodeReady robust to watch disconnections
Replace watch-based node waiting with polling using PollUntilContextCancel. - Use List instead of Watch to avoid silent failures on network errors - Return (string, error) instead of calling t.Fatalf() - Log transient API errors for debugging - Provide clear error messages distinguishing "not found" vs "not ready"
1 parent b014d2f commit 667faf9

2 files changed

Lines changed: 39 additions & 49 deletions

File tree

e2e/kube.go

Lines changed: 36 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,17 @@ import (
66
"encoding/json"
77
"fmt"
88
"strings"
9-
"testing"
109
"time"
1110

1211
"github.com/Azure/agentbaker/e2e/config"
1312
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
14-
"github.com/stretchr/testify/require"
1513
appsv1 "k8s.io/api/apps/v1"
1614
corev1 "k8s.io/api/core/v1"
1715
v1 "k8s.io/api/core/v1"
1816
errorsk8s "k8s.io/apimachinery/pkg/api/errors"
1917
"k8s.io/apimachinery/pkg/api/resource"
2018
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2119
"k8s.io/apimachinery/pkg/util/wait"
22-
"k8s.io/apimachinery/pkg/watch"
2320
"k8s.io/client-go/kubernetes"
2421
"k8s.io/client-go/rest"
2522
"k8s.io/client-go/tools/clientcmd"
@@ -136,60 +133,51 @@ func (k *Kubeclient) WaitUntilPodRunning(ctx context.Context, namespace string,
136133
return pod, err
137134
}
138135

139-
func (k *Kubeclient) WaitUntilNodeReady(ctx context.Context, t testing.TB, vmssName string) string {
140-
startTime := time.Now()
141-
t.Logf("waiting for node %s to be ready in k8s API", vmssName)
142-
defer func() {
143-
t.Logf("waited for node %s to be ready in k8s API for %s", vmssName, time.Since(startTime))
144-
}()
145-
146-
var node *corev1.Node = nil
147-
watcher, err := k.Typed.CoreV1().Nodes().Watch(ctx, metav1.ListOptions{})
148-
require.NoError(t, err, "failed to start watching nodes")
149-
defer watcher.Stop()
150-
151-
for event := range watcher.ResultChan() {
152-
if event.Type != watch.Added && event.Type != watch.Modified {
153-
continue
154-
}
155-
156-
var nodeFromEvent *corev1.Node
157-
switch v := event.Object.(type) {
158-
case *corev1.Node:
159-
nodeFromEvent = v
136+
func (k *Kubeclient) WaitUntilNodeReady(ctx context.Context, vmssName string) (string, error) {
137+
ticker := time.NewTicker(5 * time.Second)
138+
defer ticker.Stop()
160139

161-
default:
162-
t.Logf("skipping object type %T", event.Object)
163-
continue
164-
}
140+
var foundNode *corev1.Node
165141

166-
if !strings.HasPrefix(nodeFromEvent.Name, vmssName) {
167-
continue
142+
for {
143+
nodes, err := k.Typed.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
144+
if err != nil {
145+
logf(ctx, "failed to list nodes: %v", err)
146+
} else {
147+
foundNode = nil
148+
for _, node := range nodes.Items {
149+
if strings.HasPrefix(node.Name, vmssName) {
150+
foundNode = &node
151+
if isNodeReady(&node) {
152+
return node.Name, nil
153+
}
154+
break
155+
}
156+
}
168157
}
169158

170-
// found the right node. Use it!
171-
node = nodeFromEvent
172-
nodeTaints, _ := json.Marshal(node.Spec.Taints)
173-
nodeConditions, _ := json.Marshal(node.Status.Conditions)
174-
175-
for _, cond := range node.Status.Conditions {
176-
if cond.Type == corev1.NodeReady && cond.Status == corev1.ConditionTrue {
177-
t.Logf("node %s is ready. Taints: %s Conditions: %s", node.Name, string(nodeTaints), string(nodeConditions))
178-
return node.Name
159+
select {
160+
case <-ctx.Done():
161+
if foundNode != nil {
162+
return "", fmt.Errorf("node %s not ready: conditions=%v, taints=%v: %w",
163+
foundNode.Name,
164+
foundNode.Status.Conditions,
165+
foundNode.Spec.Taints,
166+
ctx.Err())
179167
}
168+
return "", fmt.Errorf("node with prefix %q not found: %w", vmssName, ctx.Err())
169+
case <-ticker.C:
180170
}
181-
182-
t.Logf("node %s is not ready. Taints: %s Conditions: %s", node.Name, string(nodeTaints), string(nodeConditions))
183171
}
172+
}
184173

185-
if node == nil {
186-
t.Fatalf("%q haven't appeared in k8s API server", vmssName)
187-
return ""
174+
func isNodeReady(node *corev1.Node) bool {
175+
for _, cond := range node.Status.Conditions {
176+
if cond.Type == corev1.NodeReady && cond.Status == corev1.ConditionTrue {
177+
return true
178+
}
188179
}
189-
190-
nodeString, _ := json.Marshal(node)
191-
t.Fatalf("failed to wait for %q (%s) to be ready %+v. Detail: %s", vmssName, node.Name, node.Status, string(nodeString))
192-
return node.Name
180+
return false
193181
}
194182

195183
// GetHostNetworkDebugPod returns a pod that's a member of the 'debug' daemonset, running on an aks-nodepool node.

e2e/test_helpers.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,9 @@ func prepareAKSNode(ctx context.Context, s *Scenario) *ScenarioVM {
269269
if !s.Config.SkipDefaultValidation {
270270
vmssCreatedAt := time.Now() // Record the start time
271271
creationElapse := time.Since(start) // Calculate the elapsed time
272-
scenarioVM.KubeName = s.Runtime.Cluster.Kube.WaitUntilNodeReady(ctx, s.T, s.Runtime.VMSSName)
272+
kubeName, err := s.Runtime.Cluster.Kube.WaitUntilNodeReady(ctx, s.Runtime.VMSSName)
273+
require.NoError(s.T, err)
274+
scenarioVM.KubeName = kubeName
273275
readyElapse := time.Since(vmssCreatedAt) // Calculate the elapsed time
274276
totalElapse := time.Since(start)
275277
toolkit.LogDuration(ctx, totalElapse, 3*time.Minute, fmt.Sprintf("Node %s took %s to be created and %s to be ready", s.Runtime.VMSSName, toolkit.FormatDuration(creationElapse), toolkit.FormatDuration(readyElapse)))

0 commit comments

Comments
 (0)