Skip to content

Commit 11eb6ba

Browse files
Merge pull request #30956 from neisw/resource-watch-fixes
NO-JIRA: harden watch loop to prevent thread exhaustion
2 parents 9f23088 + ddaf89e commit 11eb6ba

2 files changed

Lines changed: 377 additions & 14 deletions

File tree

pkg/resourcewatch/observe/observe.go

Lines changed: 108 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package observe
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"time"
78

@@ -11,6 +12,7 @@ import (
1112
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
1213
"k8s.io/apimachinery/pkg/runtime/schema"
1314
"k8s.io/apimachinery/pkg/types"
15+
"k8s.io/apimachinery/pkg/util/wait"
1416
"k8s.io/apimachinery/pkg/watch"
1517
"k8s.io/client-go/dynamic"
1618
)
@@ -20,13 +22,36 @@ type resourceMeta struct {
2022
lastObserved *unstructured.Unstructured
2123
}
2224

25+
var (
26+
errWatchClosed = errors.New("resource watch closed")
27+
errWatchErrorEvent = errors.New("resource watch error event")
28+
errUnexpectedObject = errors.New("unexpected watch object type")
29+
)
30+
31+
const (
32+
notFoundRetryDelay = 5 * time.Second
33+
minRetryDelay = 500 * time.Millisecond
34+
maxRetryDelay = 30 * time.Second
35+
)
36+
37+
func newRetryBackoff() wait.Backoff {
38+
return wait.Backoff{
39+
Duration: minRetryDelay,
40+
Factor: 2.0,
41+
Jitter: 0.5,
42+
Steps: 8, // 500ms -> 1s -> 2s -> 4s -> 8s -> 16s -> 30s (cap); then 30s+jitter indefinitely
43+
Cap: maxRetryDelay,
44+
}
45+
}
46+
2347
// ObserveResource monitors a Kubernetes resource for changes
2448
func ObserveResource(ctx context.Context, log logr.Logger, client *dynamic.DynamicClient, gvr schema.GroupVersionResource, resourceC chan<- *ResourceObservation) {
2549
log = log.WithName("ObserveResource").WithValues("group", gvr.Group, "version", gvr.Version, "resource", gvr.Resource)
2650

2751
resourceClient := client.Resource(gvr)
2852

2953
observedResources := make(map[types.UID]*resourceMeta)
54+
backoff := newRetryBackoff()
3055

3156
for {
3257
select {
@@ -35,23 +60,76 @@ func ObserveResource(ctx context.Context, log logr.Logger, client *dynamic.Dynam
3560
default:
3661
}
3762

38-
if err := listAndWatchResource(ctx, log, resourceClient, gvr, observedResources, resourceC); err != nil {
39-
log.Error(err, "failed to list and watch resource")
63+
watchStart := time.Now()
64+
err := listAndWatchResource(ctx, log, resourceClient, gvr, observedResources, resourceC)
65+
if err == nil {
66+
continue
67+
}
68+
69+
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
70+
return
71+
}
72+
if errors.Is(err, errUnexpectedObject) {
73+
log.Error(err, "terminal resource watch failure")
74+
return
75+
}
76+
77+
// If the watch ran for a healthy period before failing (e.g. a normal
78+
// watch expiration after minutes of successful operation), reset the
79+
// backoff so the next retry starts quickly.
80+
if time.Since(watchStart) >= maxRetryDelay {
81+
backoff = newRetryBackoff()
82+
}
83+
84+
var retryDelay time.Duration
85+
if apierrors.IsNotFound(err) {
86+
retryDelay = notFoundRetryDelay
87+
} else {
88+
retryDelay = backoff.Step()
89+
}
90+
log.Error(err, "failed to list and watch resource", "retryReason", retryReason(err), "retryDelay", retryDelay)
91+
92+
if !waitForRetry(ctx, retryDelay) {
93+
return
4094
}
4195
}
4296
}
4397

98+
func waitForRetry(ctx context.Context, delay time.Duration) bool {
99+
timer := time.NewTimer(delay)
100+
defer timer.Stop()
101+
102+
select {
103+
case <-ctx.Done():
104+
return false
105+
case <-timer.C:
106+
return true
107+
}
108+
}
109+
110+
func retryReason(err error) string {
111+
switch {
112+
case apierrors.IsNotFound(err):
113+
return "listNotFound"
114+
case errors.Is(err, errWatchClosed):
115+
return "watchClosed"
116+
case errors.Is(err, errWatchErrorEvent):
117+
return "watchError"
118+
case errors.Is(err, errUnexpectedObject):
119+
return "decodeError"
120+
default:
121+
return "listOrWatchError"
122+
}
123+
}
124+
44125
func listAndWatchResource(ctx context.Context, log logr.Logger, client dynamic.NamespaceableResourceInterface, gvr schema.GroupVersionResource, observedResources map[types.UID]*resourceMeta, resourceC chan<- *ResourceObservation) error {
45126
listResourceVersion, err := listResource(ctx, log, client, gvr, observedResources, resourceC)
46127
if err != nil {
47128
// List returns a NotFound error if the resource doesn't exist. We
48129
// expect this to happen during cluster installation before CRDs are
49-
// admitted. Poll at 5 second intervals if this happens to avoid
50-
// spamming api-server or the logs.
130+
// admitted.
51131
if apierrors.IsNotFound(err) {
52-
log.Info("Resource not found, polling")
53-
time.Sleep(5 * time.Second)
54-
return nil
132+
log.Info("Resource not found")
55133
}
56134
return err
57135
}
@@ -62,6 +140,7 @@ func listAndWatchResource(ctx context.Context, log logr.Logger, client dynamic.N
62140
if err != nil {
63141
return fmt.Errorf("failed to watch resource: %w", err)
64142
}
143+
defer resourceWatch.Stop()
65144

66145
resultChan := resourceWatch.ResultChan()
67146
for {
@@ -70,23 +149,38 @@ func listAndWatchResource(ctx context.Context, log logr.Logger, client dynamic.N
70149
return ctx.Err()
71150
case observation, ok := <-resultChan:
72151
if !ok {
73-
log.Info("Resource watch closed")
74-
return nil
152+
// Watch channel closed (e.g. watch expired); caller will re-list with backoff.
153+
log.Info("Watch channel closed, will retry")
154+
return errWatchClosed
155+
}
156+
157+
switch observation.Type {
158+
case watch.Bookmark:
159+
// Bookmarks are periodic progress notifications; no state change to emit.
160+
continue
161+
case watch.Error:
162+
status, ok := observation.Object.(*metav1.Status)
163+
if !ok {
164+
return fmt.Errorf("%w: %T", errUnexpectedObject, observation.Object)
165+
}
166+
return fmt.Errorf("%w: reason=%s message=%s", errWatchErrorEvent, status.Reason, status.Message)
167+
case watch.Added, watch.Modified, watch.Deleted:
168+
// handled below
169+
default:
170+
log.Info("Unhandled watch event", "type", observation.Type)
171+
continue
75172
}
76173

77174
object, ok := observation.Object.(*unstructured.Unstructured)
78175
if !ok {
79-
return fmt.Errorf("failed to cast observation object to unstructured: %T", observation.Object)
176+
return fmt.Errorf("%w: %T", errUnexpectedObject, observation.Object)
80177
}
81178

82179
switch observation.Type {
83-
case watch.Added:
84-
case watch.Modified:
180+
case watch.Added, watch.Modified:
85181
emitUpdate(observedResources, gvr, object, resourceC)
86182
case watch.Deleted:
87183
emitDelete(observedResources, gvr, object, resourceC)
88-
default:
89-
log.Info("Unhandled watch event", "type", observation.Type)
90184
}
91185
}
92186
}

0 commit comments

Comments
 (0)