Skip to content

Commit fd007d1

Browse files
committed
resourcewatch: harden watch loop to prevent thread exhaustion
Handle watch error/bookmark events safely, always stop watch streams, and add context-aware retry backoff so reconnects do not spin and accumulate threads. - Dispatch on event type before casting to *unstructured.Unstructured so that watch.Error (which carries *metav1.Status) and watch.Bookmark events no longer trigger a cast failure and tight retry loop. - Route watch.Added events through emitUpdate so resources created after the initial list are observed immediately. - Add defer resourceWatch.Stop() so all return paths release the underlying watch stream. - Replace the bare time.Sleep with bounded exponential backoff and jitter, clamped to [500ms, 30s], using context-aware waiting for prompt cancellation. - Propagate NotFound from listAndWatchResource as an error so retry pacing is governed centrally by ObserveResource. - Add unit tests for error-event handling, watch cleanup, backoff bounds, context cancellation, and Added-event observation. Made-with: Cursor
1 parent 78d08d3 commit fd007d1

2 files changed

Lines changed: 347 additions & 13 deletions

File tree

pkg/resourcewatch/observe/observe.go

Lines changed: 112 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package observe
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
7+
"math/rand"
68
"time"
79

810
"github.com/go-logr/logr"
@@ -20,13 +22,26 @@ type resourceMeta struct {
2022
lastObserved *unstructured.Unstructured
2123
}
2224

25+
var (
26+
errWatchClosed = errors.New("resource watch closed")
27+
errWatchErrorEvent = errors.New("resource watch error event")
28+
errUnexpectedObject = errors.New("unexpected watch object type")
29+
)
30+
31+
const (
32+
notFoundRetryDelay = 5 * time.Second
33+
minRetryDelay = 500 * time.Millisecond
34+
maxRetryDelay = 30 * time.Second
35+
)
36+
2337
// ObserveResource monitors a Kubernetes resource for changes
2438
func ObserveResource(ctx context.Context, log logr.Logger, client *dynamic.DynamicClient, gvr schema.GroupVersionResource, resourceC chan<- *ResourceObservation) {
2539
log = log.WithName("ObserveResource").WithValues("group", gvr.Group, "version", gvr.Version, "resource", gvr.Resource)
2640

2741
resourceClient := client.Resource(gvr)
2842

2943
observedResources := make(map[types.UID]*resourceMeta)
44+
retryAttempt := 0
3045

3146
for {
3247
select {
@@ -36,8 +51,81 @@ func ObserveResource(ctx context.Context, log logr.Logger, client *dynamic.Dynam
3651
}
3752

3853
if err := listAndWatchResource(ctx, log, resourceClient, gvr, observedResources, resourceC); err != nil {
39-
log.Error(err, "failed to list and watch resource")
54+
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
55+
return
56+
}
57+
58+
retryDelay := nextRetryDelay(err, retryAttempt)
59+
log.Error(err, "failed to list and watch resource", "retryReason", retryReason(err), "retryDelay", retryDelay)
60+
61+
if !waitForRetry(ctx, retryDelay) {
62+
return
63+
}
64+
65+
retryAttempt++
66+
continue
4067
}
68+
69+
// If a watch cycle ends cleanly, start retries from the base delay.
70+
retryAttempt = 0
71+
}
72+
}
73+
74+
func nextRetryDelay(err error, retryAttempt int) time.Duration {
75+
if apierrors.IsNotFound(err) {
76+
return notFoundRetryDelay
77+
}
78+
79+
backoff := minRetryDelay
80+
for i := 0; i < retryAttempt; i++ {
81+
if backoff >= maxRetryDelay/2 {
82+
backoff = maxRetryDelay
83+
break
84+
}
85+
backoff *= 2
86+
}
87+
if backoff > maxRetryDelay {
88+
backoff = maxRetryDelay
89+
}
90+
91+
jitter := backoff / 4
92+
if jitter > 0 {
93+
jitterDelta := time.Duration(rand.Int63n(int64(2*jitter)+1)) - jitter
94+
backoff += jitterDelta
95+
}
96+
if backoff < minRetryDelay {
97+
backoff = minRetryDelay
98+
}
99+
if backoff > maxRetryDelay {
100+
backoff = maxRetryDelay
101+
}
102+
return backoff
103+
}
104+
105+
func waitForRetry(ctx context.Context, delay time.Duration) bool {
106+
timer := time.NewTimer(delay)
107+
defer timer.Stop()
108+
109+
select {
110+
case <-ctx.Done():
111+
return false
112+
case <-timer.C:
113+
return true
114+
}
115+
}
116+
117+
func retryReason(err error) string {
118+
switch {
119+
case apierrors.IsNotFound(err):
120+
return "listNotFound"
121+
case errors.Is(err, errWatchClosed):
122+
return "watchClosed"
123+
case errors.Is(err, errWatchErrorEvent):
124+
return "watchError"
125+
case errors.Is(err, errUnexpectedObject):
126+
return "decodeError"
127+
default:
128+
return "listOrWatchError"
41129
}
42130
}
43131

@@ -46,12 +134,9 @@ func listAndWatchResource(ctx context.Context, log logr.Logger, client dynamic.N
46134
if err != nil {
47135
// List returns a NotFound error if the resource doesn't exist. We
48136
// expect this to happen during cluster installation before CRDs are
49-
// admitted. Poll at 5 second intervals if this happens to avoid
50-
// spamming api-server or the logs.
137+
// admitted.
51138
if apierrors.IsNotFound(err) {
52-
log.Info("Resource not found, polling")
53-
time.Sleep(5 * time.Second)
54-
return nil
139+
log.Info("Resource not found")
55140
}
56141
return err
57142
}
@@ -62,6 +147,7 @@ func listAndWatchResource(ctx context.Context, log logr.Logger, client dynamic.N
62147
if err != nil {
63148
return fmt.Errorf("failed to watch resource: %w", err)
64149
}
150+
defer resourceWatch.Stop()
65151

66152
resultChan := resourceWatch.ResultChan()
67153
for {
@@ -70,23 +156,36 @@ func listAndWatchResource(ctx context.Context, log logr.Logger, client dynamic.N
70156
return ctx.Err()
71157
case observation, ok := <-resultChan:
72158
if !ok {
73-
log.Info("Resource watch closed")
74-
return nil
159+
return errWatchClosed
160+
}
161+
162+
switch observation.Type {
163+
case watch.Bookmark:
164+
// Bookmarks are periodic progress notifications; no state change to emit.
165+
continue
166+
case watch.Error:
167+
status, ok := observation.Object.(*metav1.Status)
168+
if !ok {
169+
return fmt.Errorf("%w: %T", errWatchErrorEvent, observation.Object)
170+
}
171+
return fmt.Errorf("%w: reason=%s message=%s", errWatchErrorEvent, status.Reason, status.Message)
172+
case watch.Added, watch.Modified, watch.Deleted:
173+
// handled below
174+
default:
175+
log.Info("Unhandled watch event", "type", observation.Type)
176+
continue
75177
}
76178

77179
object, ok := observation.Object.(*unstructured.Unstructured)
78180
if !ok {
79-
return fmt.Errorf("failed to cast observation object to unstructured: %T", observation.Object)
181+
return fmt.Errorf("%w: %T", errUnexpectedObject, observation.Object)
80182
}
81183

82184
switch observation.Type {
83-
case watch.Added:
84-
case watch.Modified:
185+
case watch.Added, watch.Modified:
85186
emitUpdate(observedResources, gvr, object, resourceC)
86187
case watch.Deleted:
87188
emitDelete(observedResources, gvr, object, resourceC)
88-
default:
89-
log.Info("Unhandled watch event", "type", observation.Type)
90189
}
91190
}
92191
}

0 commit comments

Comments
 (0)