Skip to content

Commit b38a45a

Browse files
committed
resourcewatch: harden watch loop to prevent thread exhaustion
Handle watch error/bookmark events safely, always stop watch streams, and add context-aware retry backoff so reconnects do not spin and accumulate threads. Made-with: Cursor
1 parent 78d08d3 commit b38a45a

2 files changed

Lines changed: 277 additions & 9 deletions

File tree

pkg/resourcewatch/observe/observe.go

Lines changed: 108 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package observe
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
7+
"math/rand"
68
"time"
79

810
"github.com/go-logr/logr"
@@ -20,13 +22,26 @@ type resourceMeta struct {
2022
lastObserved *unstructured.Unstructured
2123
}
2224

25+
var (
26+
errWatchClosed = errors.New("resource watch closed")
27+
errWatchErrorEvent = errors.New("resource watch error event")
28+
errUnexpectedObject = errors.New("unexpected watch object type")
29+
)
30+
31+
const (
32+
notFoundRetryDelay = 5 * time.Second
33+
minRetryDelay = 500 * time.Millisecond
34+
maxRetryDelay = 30 * time.Second
35+
)
36+
2337
// ObserveResource monitors a Kubernetes resource for changes
2438
func ObserveResource(ctx context.Context, log logr.Logger, client *dynamic.DynamicClient, gvr schema.GroupVersionResource, resourceC chan<- *ResourceObservation) {
2539
log = log.WithName("ObserveResource").WithValues("group", gvr.Group, "version", gvr.Version, "resource", gvr.Resource)
2640

2741
resourceClient := client.Resource(gvr)
2842

2943
observedResources := make(map[types.UID]*resourceMeta)
44+
retryAttempt := 0
3045

3146
for {
3247
select {
@@ -36,8 +51,78 @@ func ObserveResource(ctx context.Context, log logr.Logger, client *dynamic.Dynam
3651
}
3752

3853
if err := listAndWatchResource(ctx, log, resourceClient, gvr, observedResources, resourceC); err != nil {
39-
log.Error(err, "failed to list and watch resource")
54+
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
55+
return
56+
}
57+
58+
retryDelay := nextRetryDelay(err, retryAttempt)
59+
log.Error(err, "failed to list and watch resource", "retryReason", retryReason(err), "retryDelay", retryDelay)
60+
61+
if !waitForRetry(ctx, retryDelay) {
62+
return
63+
}
64+
65+
retryAttempt++
66+
continue
67+
}
68+
69+
// If a watch cycle ends cleanly, start retries from the base delay.
70+
retryAttempt = 0
71+
}
72+
}
73+
74+
func nextRetryDelay(err error, retryAttempt int) time.Duration {
75+
if apierrors.IsNotFound(err) {
76+
return notFoundRetryDelay
77+
}
78+
79+
backoff := minRetryDelay
80+
for i := 0; i < retryAttempt; i++ {
81+
if backoff >= maxRetryDelay/2 {
82+
backoff = maxRetryDelay
83+
break
4084
}
85+
backoff *= 2
86+
}
87+
if backoff > maxRetryDelay {
88+
backoff = maxRetryDelay
89+
}
90+
91+
jitter := backoff / 4
92+
if jitter > 0 {
93+
jitterDelta := time.Duration(rand.Int63n(int64(2*jitter)+1)) - jitter
94+
backoff += jitterDelta
95+
}
96+
if backoff < minRetryDelay {
97+
return minRetryDelay
98+
}
99+
return backoff
100+
}
101+
102+
func waitForRetry(ctx context.Context, delay time.Duration) bool {
103+
timer := time.NewTimer(delay)
104+
defer timer.Stop()
105+
106+
select {
107+
case <-ctx.Done():
108+
return false
109+
case <-timer.C:
110+
return true
111+
}
112+
}
113+
114+
func retryReason(err error) string {
115+
switch {
116+
case apierrors.IsNotFound(err):
117+
return "listNotFound"
118+
case errors.Is(err, errWatchClosed):
119+
return "watchClosed"
120+
case errors.Is(err, errWatchErrorEvent):
121+
return "watchError"
122+
case errors.Is(err, errUnexpectedObject):
123+
return "decodeError"
124+
default:
125+
return "listOrWatchError"
41126
}
42127
}
43128

@@ -46,12 +131,9 @@ func listAndWatchResource(ctx context.Context, log logr.Logger, client dynamic.N
46131
if err != nil {
47132
// List returns a NotFound error if the resource doesn't exist. We
48133
// expect this to happen during cluster installation before CRDs are
49-
// admitted. Poll at 5 second intervals if this happens to avoid
50-
// spamming api-server or the logs.
134+
// admitted.
51135
if apierrors.IsNotFound(err) {
52-
log.Info("Resource not found, polling")
53-
time.Sleep(5 * time.Second)
54-
return nil
136+
log.Info("Resource not found")
55137
}
56138
return err
57139
}
@@ -62,6 +144,7 @@ func listAndWatchResource(ctx context.Context, log logr.Logger, client dynamic.N
62144
if err != nil {
63145
return fmt.Errorf("failed to watch resource: %w", err)
64146
}
147+
defer resourceWatch.Stop()
65148

66149
resultChan := resourceWatch.ResultChan()
67150
for {
@@ -70,13 +153,29 @@ func listAndWatchResource(ctx context.Context, log logr.Logger, client dynamic.N
70153
return ctx.Err()
71154
case observation, ok := <-resultChan:
72155
if !ok {
73-
log.Info("Resource watch closed")
74-
return nil
156+
return errWatchClosed
157+
}
158+
159+
switch observation.Type {
160+
case watch.Bookmark:
161+
// Bookmarks are periodic progress notifications; no state change to emit.
162+
continue
163+
case watch.Error:
164+
status, ok := observation.Object.(*metav1.Status)
165+
if !ok {
166+
return fmt.Errorf("%w: %T", errWatchErrorEvent, observation.Object)
167+
}
168+
return fmt.Errorf("%w: reason=%s message=%s", errWatchErrorEvent, status.Reason, status.Message)
169+
case watch.Added, watch.Modified, watch.Deleted:
170+
// handled below
171+
default:
172+
log.Info("Unhandled watch event", "type", observation.Type)
173+
continue
75174
}
76175

77176
object, ok := observation.Object.(*unstructured.Unstructured)
78177
if !ok {
79-
return fmt.Errorf("failed to cast observation object to unstructured: %T", observation.Object)
178+
return fmt.Errorf("%w: %T", errUnexpectedObject, observation.Object)
80179
}
81180

82181
switch observation.Type {
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
package observe
2+
3+
import (
4+
"context"
5+
"errors"
6+
"strings"
7+
"testing"
8+
"time"
9+
10+
apierrors "k8s.io/apimachinery/pkg/api/errors"
11+
"k8s.io/apimachinery/pkg/apis/meta/v1"
12+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
13+
"k8s.io/apimachinery/pkg/runtime/schema"
14+
"k8s.io/apimachinery/pkg/types"
15+
"k8s.io/apimachinery/pkg/watch"
16+
"k8s.io/client-go/dynamic"
17+
"k8s.io/klog/v2"
18+
)
19+
20+
type fakeNamespaceableResource struct {
21+
listFn func(ctx context.Context, opts v1.ListOptions) (*unstructured.UnstructuredList, error)
22+
watchFn func(ctx context.Context, opts v1.ListOptions) (watch.Interface, error)
23+
}
24+
25+
func (f *fakeNamespaceableResource) Namespace(string) dynamic.ResourceInterface {
26+
return f
27+
}
28+
29+
func (f *fakeNamespaceableResource) Create(context.Context, *unstructured.Unstructured, v1.CreateOptions, ...string) (*unstructured.Unstructured, error) {
30+
panic("not implemented")
31+
}
32+
33+
func (f *fakeNamespaceableResource) Update(context.Context, *unstructured.Unstructured, v1.UpdateOptions, ...string) (*unstructured.Unstructured, error) {
34+
panic("not implemented")
35+
}
36+
37+
func (f *fakeNamespaceableResource) UpdateStatus(context.Context, *unstructured.Unstructured, v1.UpdateOptions) (*unstructured.Unstructured, error) {
38+
panic("not implemented")
39+
}
40+
41+
func (f *fakeNamespaceableResource) Delete(context.Context, string, v1.DeleteOptions, ...string) error {
42+
panic("not implemented")
43+
}
44+
45+
func (f *fakeNamespaceableResource) DeleteCollection(context.Context, v1.DeleteOptions, v1.ListOptions) error {
46+
panic("not implemented")
47+
}
48+
49+
func (f *fakeNamespaceableResource) Get(context.Context, string, v1.GetOptions, ...string) (*unstructured.Unstructured, error) {
50+
panic("not implemented")
51+
}
52+
53+
func (f *fakeNamespaceableResource) List(ctx context.Context, opts v1.ListOptions) (*unstructured.UnstructuredList, error) {
54+
return f.listFn(ctx, opts)
55+
}
56+
57+
func (f *fakeNamespaceableResource) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) {
58+
return f.watchFn(ctx, opts)
59+
}
60+
61+
func (f *fakeNamespaceableResource) Patch(context.Context, string, types.PatchType, []byte, v1.PatchOptions, ...string) (*unstructured.Unstructured, error) {
62+
panic("not implemented")
63+
}
64+
65+
func (f *fakeNamespaceableResource) Apply(context.Context, string, *unstructured.Unstructured, v1.ApplyOptions, ...string) (*unstructured.Unstructured, error) {
66+
panic("not implemented")
67+
}
68+
69+
func (f *fakeNamespaceableResource) ApplyStatus(context.Context, string, *unstructured.Unstructured, v1.ApplyOptions) (*unstructured.Unstructured, error) {
70+
panic("not implemented")
71+
}
72+
73+
type trackingWatch struct {
74+
resultC chan watch.Event
75+
stopped bool
76+
}
77+
78+
func newTrackingWatch() *trackingWatch {
79+
return &trackingWatch{
80+
resultC: make(chan watch.Event, 4),
81+
}
82+
}
83+
84+
func (w *trackingWatch) Stop() {
85+
if w.stopped {
86+
return
87+
}
88+
w.stopped = true
89+
close(w.resultC)
90+
}
91+
92+
func (w *trackingWatch) ResultChan() <-chan watch.Event {
93+
return w.resultC
94+
}
95+
96+
func TestListAndWatchResource_StopsWatchAndHandlesErrorEvent(t *testing.T) {
97+
t.Parallel()
98+
99+
resourceWatch := newTrackingWatch()
100+
resourceWatch.resultC <- watch.Event{
101+
Type: watch.Error,
102+
Object: &v1.Status{
103+
Reason: v1.StatusReasonExpired,
104+
Message: "resource version too old",
105+
},
106+
}
107+
108+
client := &fakeNamespaceableResource{
109+
listFn: func(context.Context, v1.ListOptions) (*unstructured.UnstructuredList, error) {
110+
return &unstructured.UnstructuredList{
111+
Object: map[string]interface{}{
112+
"metadata": map[string]interface{}{
113+
"resourceVersion": "123",
114+
},
115+
},
116+
}, nil
117+
},
118+
watchFn: func(context.Context, v1.ListOptions) (watch.Interface, error) {
119+
return resourceWatch, nil
120+
},
121+
}
122+
123+
err := listAndWatchResource(context.Background(), klog.NewKlogr(), client, schema.GroupVersionResource{Resource: "pods"}, map[types.UID]*resourceMeta{}, make(chan *ResourceObservation, 8))
124+
if !errors.Is(err, errWatchErrorEvent) {
125+
t.Fatalf("expected watch error event, got: %v", err)
126+
}
127+
if !strings.Contains(err.Error(), "resource version too old") {
128+
t.Fatalf("expected status message in error, got: %v", err)
129+
}
130+
if !resourceWatch.stopped {
131+
t.Fatalf("expected watch.Stop() to be called")
132+
}
133+
}
134+
135+
func TestNextRetryDelay_BackoffAndNotFound(t *testing.T) {
136+
t.Parallel()
137+
138+
notFoundWrapped := apierrors.NewNotFound(schema.GroupResource{Group: "apps", Resource: "deployments"}, "example")
139+
if got := nextRetryDelay(notFoundWrapped, 5); got != notFoundRetryDelay {
140+
t.Fatalf("expected not found retry delay %v, got %v", notFoundRetryDelay, got)
141+
}
142+
143+
first := nextRetryDelay(errors.New("watch failed"), 0)
144+
second := nextRetryDelay(errors.New("watch failed"), 1)
145+
if second <= first {
146+
t.Fatalf("expected retry delay to increase, got first=%v second=%v", first, second)
147+
}
148+
149+
maxed := nextRetryDelay(errors.New("watch failed"), 50)
150+
if maxed > maxRetryDelay {
151+
t.Fatalf("expected retry delay to cap at <= %v, got %v", maxRetryDelay, maxed)
152+
}
153+
}
154+
155+
func TestWaitForRetry_CancelledContext(t *testing.T) {
156+
t.Parallel()
157+
158+
ctx, cancel := context.WithCancel(context.Background())
159+
cancel()
160+
161+
start := time.Now()
162+
ok := waitForRetry(ctx, 2*time.Second)
163+
if ok {
164+
t.Fatalf("expected waitForRetry to abort on cancelled context")
165+
}
166+
if elapsed := time.Since(start); elapsed > 250*time.Millisecond {
167+
t.Fatalf("expected prompt cancellation, elapsed=%v", elapsed)
168+
}
169+
}

0 commit comments

Comments
 (0)