-
Notifications
You must be signed in to change notification settings - Fork 4.8k
NO-JIRA: harden watch loop to prevent thread exhaustion #30956
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -2,6 +2,7 @@ package observe | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "context" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "errors" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "fmt" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "time" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -11,6 +12,7 @@ import ( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "k8s.io/apimachinery/pkg/runtime/schema" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "k8s.io/apimachinery/pkg/types" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "k8s.io/apimachinery/pkg/util/wait" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "k8s.io/apimachinery/pkg/watch" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "k8s.io/client-go/dynamic" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -20,13 +22,36 @@ type resourceMeta struct { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| lastObserved *unstructured.Unstructured | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| var ( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| errWatchClosed = errors.New("resource watch closed") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| errWatchErrorEvent = errors.New("resource watch error event") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| errUnexpectedObject = errors.New("unexpected watch object type") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const ( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| notFoundRetryDelay = 5 * time.Second | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| minRetryDelay = 500 * time.Millisecond | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| maxRetryDelay = 30 * time.Second | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| func newRetryBackoff() wait.Backoff { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return wait.Backoff{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Duration: minRetryDelay, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Factor: 2.0, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Jitter: 0.5, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Steps: 8, // 500ms -> 1s -> 2s -> 4s -> 8s -> 16s -> 30s (cap); then 30s+jitter indefinitely | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Cap: maxRetryDelay, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // ObserveResource monitors a Kubernetes resource for changes | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| func ObserveResource(ctx context.Context, log logr.Logger, client *dynamic.DynamicClient, gvr schema.GroupVersionResource, resourceC chan<- *ResourceObservation) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| log = log.WithName("ObserveResource").WithValues("group", gvr.Group, "version", gvr.Version, "resource", gvr.Resource) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| resourceClient := client.Resource(gvr) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| observedResources := make(map[types.UID]*resourceMeta) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| backoff := newRetryBackoff() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| select { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -35,23 +60,76 @@ func ObserveResource(ctx context.Context, log logr.Logger, client *dynamic.Dynam | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| default: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if err := listAndWatchResource(ctx, log, resourceClient, gvr, observedResources, resourceC); err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| log.Error(err, "failed to list and watch resource") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| watchStart := time.Now() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| err := listAndWatchResource(ctx, log, resourceClient, gvr, observedResources, resourceC) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if err == nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| continue | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if errors.Is(err, errUnexpectedObject) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| log.Error(err, "terminal resource watch failure") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // If the watch ran for a healthy period before failing (e.g. a normal | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // watch expiration after minutes of successful operation), reset the | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // backoff so the next retry starts quickly. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if time.Since(watchStart) >= maxRetryDelay { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| backoff = newRetryBackoff() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| var retryDelay time.Duration | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if apierrors.IsNotFound(err) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| retryDelay = notFoundRetryDelay | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| retryDelay = backoff.Step() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| log.Error(err, "failed to list and watch resource", "retryReason", retryReason(err), "retryDelay", retryDelay) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if !waitForRetry(ctx, retryDelay) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
coderabbitai[bot] marked this conversation as resolved.
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| func waitForRetry(ctx context.Context, delay time.Duration) bool { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| timer := time.NewTimer(delay) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| defer timer.Stop() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| select { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case <-ctx.Done(): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return false | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case <-timer.C: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return true | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| func retryReason(err error) string { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| switch { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case apierrors.IsNotFound(err): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return "listNotFound" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case errors.Is(err, errWatchClosed): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return "watchClosed" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case errors.Is(err, errWatchErrorEvent): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return "watchError" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case errors.Is(err, errUnexpectedObject): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return "decodeError" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| default: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return "listOrWatchError" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| func listAndWatchResource(ctx context.Context, log logr.Logger, client dynamic.NamespaceableResourceInterface, gvr schema.GroupVersionResource, observedResources map[types.UID]*resourceMeta, resourceC chan<- *ResourceObservation) error { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| listResourceVersion, err := listResource(ctx, log, client, gvr, observedResources, resourceC) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // List returns a NotFound error if the resource doesn't exist. We | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // expect this to happen during cluster installation before CRDs are | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // admitted. Poll at 5 second intervals if this happens to avoid | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // spamming api-server or the logs. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // admitted. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if apierrors.IsNotFound(err) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| log.Info("Resource not found, polling") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| time.Sleep(5 * time.Second) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return nil | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| log.Info("Resource not found") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return err | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -62,6 +140,7 @@ func listAndWatchResource(ctx context.Context, log logr.Logger, client dynamic.N | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return fmt.Errorf("failed to watch resource: %w", err) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| defer resourceWatch.Stop() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| resultChan := resourceWatch.ResultChan() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -70,23 +149,38 @@ func listAndWatchResource(ctx context.Context, log logr.Logger, client dynamic.N | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return ctx.Err() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case observation, ok := <-resultChan: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if !ok { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| log.Info("Resource watch closed") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return nil | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Watch channel closed (e.g. watch expired); caller will re-list with backoff. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| log.Info("Watch channel closed, will retry") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return errWatchClosed | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| switch observation.Type { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case watch.Bookmark: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Bookmarks are periodic progress notifications; no state change to emit. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| continue | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case watch.Error: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| status, ok := observation.Object.(*metav1.Status) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if !ok { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return fmt.Errorf("%w: %T", errUnexpectedObject, observation.Object) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return fmt.Errorf("%w: reason=%s message=%s", errWatchErrorEvent, status.Reason, status.Message) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case watch.Added, watch.Modified, watch.Deleted: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // handled below | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| default: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| log.Info("Unhandled watch event", "type", observation.Type) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| continue | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| object, ok := observation.Object.(*unstructured.Unstructured) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if !ok { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return fmt.Errorf("failed to cast observation object to unstructured: %T", observation.Object) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return fmt.Errorf("%w: %T", errUnexpectedObject, observation.Object) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+161
to
+176
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🌐 Web query:
💡 Result: Yes, a type assertion to *metav1.Status on an interface holding (*metav1.Status)(nil) returns ok=true with a nil result. Citations:
🏁 Script executed: # First, let's look at the file structure and locate the file
find . -name "observe.go" -path "*/resourcewatch/observe/*" -type fRepository: openshift/origin Length of output: 98 🏁 Script executed: # Once we find it, let's examine the context around lines 161-176
if [ -f "pkg/resourcewatch/observe/observe.go" ]; then
wc -l pkg/resourcewatch/observe/observe.go
fiRepository: openshift/origin Length of output: 100 🏁 Script executed: # Get the full context around the problematic lines
if [ -f "pkg/resourcewatch/observe/observe.go" ]; then
sed -n '140,190p' pkg/resourcewatch/observe/observe.go
fiRepository: openshift/origin Length of output: 1711 🏁 Script executed: # Let's also check what the watch package is - imports at the top
if [ -f "pkg/resourcewatch/observe/observe.go" ]; then
head -30 pkg/resourcewatch/observe/observe.go
fiRepository: openshift/origin Length of output: 784 🏁 Script executed: # Look for test files related to observe
find . -name "*observe*test*.go" -o -name "*test*observe*.go" | head -20Repository: openshift/origin Length of output: 103 🏁 Script executed: # Check if there are any tests that might show edge cases with nil objects
if [ -f "pkg/resourcewatch/observe/observe_test.go" ]; then
cat -n pkg/resourcewatch/observe/observe_test.go | head -150
fiRepository: openshift/origin Length of output: 5707 🏁 Script executed: # Search for any existing nil checks or similar defensive code in the codebase
rg "observation\.Object" pkg/resourcewatch/observe/ -A 3 -B 1Repository: openshift/origin Length of output: 1087 🏁 Script executed: # Check if there are any comments or documentation about watch event handling
rg -A 5 "watch.Error\|watch.Event" pkg/resourcewatch/observe/observe.goRepository: openshift/origin Length of output: 42 Guard typed-nil watch objects before dereferencing them. In Go, a type assertion on an interface holding a typed nil pointer like Proposed fix case watch.Error:
status, ok := observation.Object.(*metav1.Status)
- if !ok {
+ if !ok || status == nil {
return fmt.Errorf("%w: %T", errUnexpectedObject, observation.Object)
}
return fmt.Errorf("%w: reason=%s message=%s", errWatchErrorEvent, status.Reason, status.Message)
case watch.Added, watch.Modified, watch.Deleted:
// handled below
default:
log.Info("Unhandled watch event", "type", observation.Type)
continue
}
object, ok := observation.Object.(*unstructured.Unstructured)
- if !ok {
+ if !ok || object == nil {
return fmt.Errorf("%w: %T", errUnexpectedObject, observation.Object)
}📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| switch observation.Type { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case watch.Added: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case watch.Modified: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case watch.Added, watch.Modified: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| emitUpdate(observedResources, gvr, object, resourceC) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case watch.Deleted: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| emitDelete(observedResources, gvr, object, resourceC) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| default: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| log.Info("Unhandled watch event", "type", observation.Type) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reset the backoff only after a healthy watch, not after a slow failing list.
watchStartis captured beforelistAndWatchResource(), so aList()that stalls for 30s+ and then errors will satisfy the "healthy period" check and drop the next retry back to the minimum delay. That weakens the new backoff exactly when the apiserver/network is unhealthy. Consider starting this timer only afterclient.Watch()succeeds, or havelistAndWatchResourcereport whether it actually entered a steady watch phase before resetting.🤖 Prompt for AI Agents
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@coderabbitai isn't our max delay 30s? If listAndWatchResource takes 30s to respond doesn't that prevent the hot looping the delay is guarding against?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@neisw— that's a fair point. IflistAndWatchResourceitself takes ≥ 30 s to fail (e.g. a slowList()), then that elapsed time already acts as the cooldown window, so resetting the backoff to 500 ms for the next attempt doesn't create hot-looping for that cycle.The narrower concern that remains: if you get one slow (≥ 30 s) failing call that triggers the reset, followed immediately by many rapid-failing calls (e.g. the apiserver recovers connectivity but returns errors very quickly), those subsequent fast failures would start burning through the exponential ramp-up from the minimum again rather than from whatever level the backoff had reached. That said, this is a fairly unlikely compound scenario, and the dominant case (consistently slow or consistently fast failures) is handled well by the current logic.
Given that context, I'd consider this a minor concern rather than a blocking one. Happy to defer to your judgment on whether the added complexity of tracking whether the watch phase was actually entered is worth it.