NO-JIRA: harden watch loop to prevent thread exhaustion#30956
NO-JIRA: harden watch loop to prevent thread exhaustion#30956neisw wants to merge 4 commits intoopenshift:mainfrom
Conversation
|
Pipeline controller notification For optional jobs, comment This repository is configured in: automatic mode |
|
@neisw: This pull request explicitly references no jira issue. DetailsIn response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the openshift-eng/jira-lifecycle-plugin repository. |
WalkthroughAdds sentinel errors, a bounded exponential retry/backoff and wait helpers to ObserveResource, makes listAndWatchResource stricter about watch events (bookmarks ignored, closed channel → error, watch.Error validated as metav1.Status), ensures watch.Stop() via defer, and adds unit tests for watch and retry behaviors. Changes
Sequence Diagram(s)sequenceDiagram
participant Caller as Caller / ObserveResource
participant WatchLoop as listAndWatchResource
participant K8s as Kubernetes API (List/Watch)
participant Backoff as Backoff / waitForRetry
Caller->>WatchLoop: call listAndWatchResource(ctx, resource...)
WatchLoop->>K8s: List() then Watch()
K8s-->>WatchLoop: events (Added / Modified / Deleted / Bookmark / Error / closed)
alt Added / Modified / Deleted
WatchLoop-->>Caller: emit ResourceObservation
WatchLoop-->>WatchLoop: continue reading events
else Bookmark
WatchLoop-->>WatchLoop: ignore
else Error event
WatchLoop-->>WatchLoop: validate *metav1.Status -> return errWatchErrorEvent (with status)
else closed channel
WatchLoop-->>WatchLoop: return errWatchClosed
end
WatchLoop-->>Caller: return (nil or error)
alt non-terminal error (not ctx done / not errUnexpectedObject)
Caller->>Backoff: compute delay (NotFound = 5s | exponential+jitter)
Backoff->>Caller: waitForRetry(ctx, delay)
Caller->>WatchLoop: retry
else terminal (ctx done or errUnexpectedObject)
Caller-->>Caller: return error
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes 🚥 Pre-merge checks | ✅ 8 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (8 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/resourcewatch/observe/observe.go`:
- Around line 73-74: The code in ObserveResource is resetting retryAttempt to 0
when the watch returns nil/ends, which causes immediate reconnects if the watch
stream was closed (ResultChan() closed) rather than a clean end; modify the
logic in the watch loop (observe.go, functions handling the watch and the block
that currently sets retryAttempt = 0) to only reset retryAttempt when the watch
truly ended cleanly (e.g., explicit stop signal or terminal condition), and
treat a closed ResultChan() (receive with ok == false) as a retryable error path
that does NOT reset retryAttempt but increments backoff and returns an error so
backoff applies; apply the same change to the analogous code paths around the
other reset occurrences (the block referenced by lines ~150-153) and add a small
regression test that simulates an already-closed ResultChan() to assert backoff
is not reset.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 66e3ac20-b4e0-4ad4-935e-ed3260c92694
📒 Files selected for processing (2)
pkg/resourcewatch/observe/observe.gopkg/resourcewatch/observe/observe_test.go
|
Scheduling required tests: |
|
/payload-job periodic-ci-openshift-release-main-nightly-4.22-e2e-gcp-ovn-rt |
|
@neisw: trigger 1 job(s) for the /payload-(with-prs|job|aggregate|job-with-prs|aggregate-with-prs) command
See details on https://pr-payload-tests.ci.openshift.org/runs/ci/3a7af320-30eb-11f1-8069-b05fa5978c62-0 |
|
Scheduling required tests: |
|
/retest-required |
3 similar comments
|
/retest-required |
|
/retest-required |
|
/retest-required |
|
Job Failure Risk Analysis for sha: ba94dc5
|
|
/payload-job periodic-ci-openshift-release-main-nightly-5.0-e2e-gcp-ovn-rt |
|
@neisw: trigger 1 job(s) for the /payload-(with-prs|job|aggregate|job-with-prs|aggregate-with-prs) command
See details on https://pr-payload-tests.ci.openshift.org/runs/ci/8a2f3510-3a6a-11f1-9128-c2c1994f1ca9-0 |
petr-muller
left a comment
There was a problem hiding this comment.
LGTM but I wonder if we could use k8s.io/apimachinery/pkg/util/wait for the retry/backoff/jitter, see inline but feel free to unhold
/lgtm
/approve
/hold
|
|
||
| if err := listAndWatchResource(ctx, log, resourceClient, gvr, observedResources, resourceC); err != nil { | ||
| log.Error(err, "failed to list and watch resource") | ||
| if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { |
There was a problem hiding this comment.
Retry with backoff & jitter felt like something k8s.io/apimachinery/pkg/util/wait could provide so I asked an LLM and it told me the following:
The PR could use wait.Backoff like this:
func ObserveResource(ctx context.Context, log logr.Logger, client *dynamic.DynamicClient, gvr schema.GroupVersionResource, resourceC chan<- *ResourceObservation) {
observedResources := make(map[types.UID]*resourceMeta)
backoff := wait.Backoff{
Duration: 500 * time.Millisecond, // minRetryDelay
Factor: 2.0,
Jitter: 0.25, // ±25% jitter built-in
Steps: 8, // ~500ms -> 30s in 8 steps
Cap: 30 * time.Second, // maxRetryDelay
}
for {
select {
case <-ctx.Done():
return
default:
}
err := listAndWatchResource(ctx, log, resourceClient, gvr, observedResources, resourceC)
if err == nil {
backoff = resetBackoff() // Reset on clean cycle
continue
}
// Handle NotFound specially
delay := notFoundRetryDelay
if !apierrors.IsNotFound(err) {
delay = backoff.Step()
}
// wait package handles timer + context
<-time.After(delay)
}
}
|
@neisw: This pull request explicitly references no jira issue. DetailsIn response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the openshift-eng/jira-lifecycle-plugin repository. |
|
/lgtm |
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: neisw, petr-muller The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/resourcewatch/observe/observe.go`:
- Around line 85-88: The backoff reset code near backoff = newRetryBackoff() is
unreachable because listAndWatchResource now always returns a non-nil error
(e.g., errWatchClosed), causing the err == nil branch to never run; fix by
implementing a sustained-success reset: add a tracked timestamp or counter
(e.g., lastErrorAt time.Time or successCount int) scoped near the retry loop in
listAndWatchResource and update it on errors and on each successful event; then
when successCount >= N or time.Since(lastErrorAt) >= healthyPeriod, call backoff
= newRetryBackoff() to reset; ensure you update code paths that return
errWatchClosed so they also update lastErrorAt/successCount appropriately and
remove the dead unreachable branch or keep it only as a safeguard.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository: openshift/coderabbit/.coderabbit.yaml
Review profile: CHILL
Plan: Pro Plus
Run ID: 82d89b05-5910-49cf-8c5e-8244a39e82f8
📒 Files selected for processing (2)
pkg/resourcewatch/observe/observe.gopkg/resourcewatch/observe/observe_test.go
✅ Files skipped from review due to trivial changes (1)
- pkg/resourcewatch/observe/observe_test.go
|
New changes are detected. LGTM label has been removed. |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/resourcewatch/observe/observe.go`:
- Around line 63-82: The code resets backoff based on watchStart which is set
before calling listAndWatchResource, so slow/failing List() calls can
incorrectly count as a "healthy" watch; change the logic so the healthy-period
timer starts only after a successful Watch() (e.g., move the watchStart
timestamp into listAndWatchResource at the point client.Watch() succeeds) or
modify listAndWatchResource to return an additional boolean (e.g., enteredWatch
bool) indicating it reached the steady watch phase, then in the caller use
enteredWatch and time.Since(watchStart) (or just the boolean plus duration
measured inside listAndWatchResource) to decide whether to reset backoff
(references: listAndWatchResource, watchStart, client.Watch, maxRetryDelay,
backoff, newRetryBackoff).
- Around line 161-176: The type assertions for metav1.Status and
unstructured.Unstructured can succeed for typed-nil values; after asserting
observation.Object to (*metav1.Status) and (*unstructured.Unstructured)
(variables status and object) you must also check that the resulting pointer is
not nil before dereferencing; if nil, return fmt.Errorf("%w: %T",
errUnexpectedObject, observation.Object) (or similar) instead of proceeding to
access status.Reason/status.Message or object fields, ensuring both the
watch.Error branch and the Added/Modified/Deleted handling guard against
typed-nil objects.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository: openshift/coderabbit/.coderabbit.yaml
Review profile: CHILL
Plan: Pro Plus
Run ID: 9b708878-474d-48d6-9c2a-33896f44122c
📒 Files selected for processing (1)
pkg/resourcewatch/observe/observe.go
| watchStart := time.Now() | ||
| err := listAndWatchResource(ctx, log, resourceClient, gvr, observedResources, resourceC) | ||
| if err == nil { | ||
| continue | ||
| } | ||
|
|
||
| if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { | ||
| return | ||
| } | ||
| if errors.Is(err, errUnexpectedObject) { | ||
| log.Error(err, "terminal resource watch failure") | ||
| return | ||
| } | ||
|
|
||
| // If the watch ran for a healthy period before failing (e.g. a normal | ||
| // watch expiration after minutes of successful operation), reset the | ||
| // backoff so the next retry starts quickly. | ||
| if time.Since(watchStart) >= maxRetryDelay { | ||
| backoff = newRetryBackoff() | ||
| } |
There was a problem hiding this comment.
Reset the backoff only after a healthy watch, not after a slow failing list.
watchStart is captured before listAndWatchResource(), so a List() that stalls for 30s+ and then errors will satisfy the "healthy period" check and drop the next retry back to the minimum delay. That weakens the new backoff exactly when the apiserver/network is unhealthy. Consider starting this timer only after client.Watch() succeeds, or have listAndWatchResource report whether it actually entered a steady watch phase before resetting.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/resourcewatch/observe/observe.go` around lines 63 - 82, The code resets
backoff based on watchStart which is set before calling listAndWatchResource, so
slow/failing List() calls can incorrectly count as a "healthy" watch; change the
logic so the healthy-period timer starts only after a successful Watch() (e.g.,
move the watchStart timestamp into listAndWatchResource at the point
client.Watch() succeeds) or modify listAndWatchResource to return an additional
boolean (e.g., enteredWatch bool) indicating it reached the steady watch phase,
then in the caller use enteredWatch and time.Since(watchStart) (or just the
boolean plus duration measured inside listAndWatchResource) to decide whether to
reset backoff (references: listAndWatchResource, watchStart, client.Watch,
maxRetryDelay, backoff, newRetryBackoff).
| case watch.Error: | ||
| status, ok := observation.Object.(*metav1.Status) | ||
| if !ok { | ||
| return fmt.Errorf("%w: %T", errUnexpectedObject, observation.Object) | ||
| } | ||
| return fmt.Errorf("%w: reason=%s message=%s", errWatchErrorEvent, status.Reason, status.Message) | ||
| case watch.Added, watch.Modified, watch.Deleted: | ||
| // handled below | ||
| default: | ||
| log.Info("Unhandled watch event", "type", observation.Type) | ||
| continue | ||
| } | ||
|
|
||
| object, ok := observation.Object.(*unstructured.Unstructured) | ||
| if !ok { | ||
| return fmt.Errorf("failed to cast observation object to unstructured: %T", observation.Object) | ||
| return fmt.Errorf("%w: %T", errUnexpectedObject, observation.Object) |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
In Go, if an interface holds a typed nil pointer like (*metav1.Status)(nil), does a type assertion to *metav1.Status return ok=true with a nil result?
💡 Result:
Yes, a type assertion to *metav1.Status on an interface holding (*metav1.Status)(nil) returns ok=true with a nil result.
Citations:
- 1: https://stackoverflow.com/questions/73855021/why-type-assertion-of-an-interface-equal-to-nil-in-type-switch-non-nil-branch
- 2: https://stackoverflow.com/questions/38816843/explain-type-assertions-in-go
- 3: https://boldlygo.tech/archive/2023-12-11-type-assertion-values
- 4: https://rednafi.com/go/nil-interface-comparison/
- 5: https://stackoverflow.com/questions/30162256/convert-nil-interface-to-pointer-of-something-in-golang
🏁 Script executed:
# First, let's look at the file structure and locate the file
find . -name "observe.go" -path "*/resourcewatch/observe/*" -type fRepository: openshift/origin
Length of output: 98
🏁 Script executed:
# Once we find it, let's examine the context around lines 161-176
if [ -f "pkg/resourcewatch/observe/observe.go" ]; then
wc -l pkg/resourcewatch/observe/observe.go
fiRepository: openshift/origin
Length of output: 100
🏁 Script executed:
# Get the full context around the problematic lines
if [ -f "pkg/resourcewatch/observe/observe.go" ]; then
sed -n '140,190p' pkg/resourcewatch/observe/observe.go
fiRepository: openshift/origin
Length of output: 1711
🏁 Script executed:
# Let's also check what the watch package is - imports at the top
if [ -f "pkg/resourcewatch/observe/observe.go" ]; then
head -30 pkg/resourcewatch/observe/observe.go
fiRepository: openshift/origin
Length of output: 784
🏁 Script executed:
# Look for test files related to observe
find . -name "*observe*test*.go" -o -name "*test*observe*.go" | head -20Repository: openshift/origin
Length of output: 103
🏁 Script executed:
# Check if there are any tests that might show edge cases with nil objects
if [ -f "pkg/resourcewatch/observe/observe_test.go" ]; then
cat -n pkg/resourcewatch/observe/observe_test.go | head -150
fiRepository: openshift/origin
Length of output: 5707
🏁 Script executed:
# Search for any existing nil checks or similar defensive code in the codebase
rg "observation\.Object" pkg/resourcewatch/observe/ -A 3 -B 1Repository: openshift/origin
Length of output: 1087
🏁 Script executed:
# Check if there are any comments or documentation about watch event handling
rg -A 5 "watch.Error\|watch.Event" pkg/resourcewatch/observe/observe.goRepository: openshift/origin
Length of output: 42
Guard typed-nil watch objects before dereferencing them.
In Go, a type assertion on an interface holding a typed nil pointer like (*metav1.Status)(nil) will return ok=true with a nil result. The current code checks only ok but not whether the asserted value is nil. If observation.Object contains a typed nil, accessing fields like status.Reason or object properties would panic. Add nil checks after each successful type assertion:
Proposed fix
case watch.Error:
status, ok := observation.Object.(*metav1.Status)
- if !ok {
+ if !ok || status == nil {
return fmt.Errorf("%w: %T", errUnexpectedObject, observation.Object)
}
return fmt.Errorf("%w: reason=%s message=%s", errWatchErrorEvent, status.Reason, status.Message)
case watch.Added, watch.Modified, watch.Deleted:
// handled below
default:
log.Info("Unhandled watch event", "type", observation.Type)
continue
}
object, ok := observation.Object.(*unstructured.Unstructured)
- if !ok {
+ if !ok || object == nil {
return fmt.Errorf("%w: %T", errUnexpectedObject, observation.Object)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| case watch.Error: | |
| status, ok := observation.Object.(*metav1.Status) | |
| if !ok { | |
| return fmt.Errorf("%w: %T", errUnexpectedObject, observation.Object) | |
| } | |
| return fmt.Errorf("%w: reason=%s message=%s", errWatchErrorEvent, status.Reason, status.Message) | |
| case watch.Added, watch.Modified, watch.Deleted: | |
| // handled below | |
| default: | |
| log.Info("Unhandled watch event", "type", observation.Type) | |
| continue | |
| } | |
| object, ok := observation.Object.(*unstructured.Unstructured) | |
| if !ok { | |
| return fmt.Errorf("failed to cast observation object to unstructured: %T", observation.Object) | |
| return fmt.Errorf("%w: %T", errUnexpectedObject, observation.Object) | |
| case watch.Error: | |
| status, ok := observation.Object.(*metav1.Status) | |
| if !ok || status == nil { | |
| return fmt.Errorf("%w: %T", errUnexpectedObject, observation.Object) | |
| } | |
| return fmt.Errorf("%w: reason=%s message=%s", errWatchErrorEvent, status.Reason, status.Message) | |
| case watch.Added, watch.Modified, watch.Deleted: | |
| // handled below | |
| default: | |
| log.Info("Unhandled watch event", "type", observation.Type) | |
| continue | |
| } | |
| object, ok := observation.Object.(*unstructured.Unstructured) | |
| if !ok || object == nil { | |
| return fmt.Errorf("%w: %T", errUnexpectedObject, observation.Object) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/resourcewatch/observe/observe.go` around lines 161 - 176, The type
assertions for metav1.Status and unstructured.Unstructured can succeed for
typed-nil values; after asserting observation.Object to (*metav1.Status) and
(*unstructured.Unstructured) (variables status and object) you must also check
that the resulting pointer is not nil before dereferencing; if nil, return
fmt.Errorf("%w: %T", errUnexpectedObject, observation.Object) (or similar)
instead of proceeding to access status.Reason/status.Message or object fields,
ensuring both the watch.Error branch and the Added/Modified/Deleted handling
guard against typed-nil objects.
|
/test images |
|
Scheduling required tests: |
|
/test e2e-aws-ovn-serial-1of2 |
|
@neisw: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
Continuation of #30944
Handle watch error/bookmark events safely, always stop watch streams, and add context-aware retry backoff so reconnects do not spin and accumulate threads.
Made-with: Cursor
Summary by CodeRabbit
New Features
Bug Fixes
Tests