Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 108 additions & 14 deletions pkg/resourcewatch/observe/observe.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package observe

import (
"context"
"errors"
"fmt"
"time"

Expand All @@ -11,6 +12,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
)
Expand All @@ -20,13 +22,36 @@ type resourceMeta struct {
lastObserved *unstructured.Unstructured
}

var (
errWatchClosed = errors.New("resource watch closed")
errWatchErrorEvent = errors.New("resource watch error event")
errUnexpectedObject = errors.New("unexpected watch object type")
)

const (
notFoundRetryDelay = 5 * time.Second
minRetryDelay = 500 * time.Millisecond
maxRetryDelay = 30 * time.Second
)

func newRetryBackoff() wait.Backoff {
return wait.Backoff{
Duration: minRetryDelay,
Factor: 2.0,
Jitter: 0.5,
Steps: 8, // 500ms -> 1s -> 2s -> 4s -> 8s -> 16s -> 30s (cap); then 30s+jitter indefinitely
Cap: maxRetryDelay,
}
}

// ObserveResource monitors a Kubernetes resource for changes
func ObserveResource(ctx context.Context, log logr.Logger, client *dynamic.DynamicClient, gvr schema.GroupVersionResource, resourceC chan<- *ResourceObservation) {
log = log.WithName("ObserveResource").WithValues("group", gvr.Group, "version", gvr.Version, "resource", gvr.Resource)

resourceClient := client.Resource(gvr)

observedResources := make(map[types.UID]*resourceMeta)
backoff := newRetryBackoff()

for {
select {
Expand All @@ -35,23 +60,76 @@ func ObserveResource(ctx context.Context, log logr.Logger, client *dynamic.Dynam
default:
}

if err := listAndWatchResource(ctx, log, resourceClient, gvr, observedResources, resourceC); err != nil {
log.Error(err, "failed to list and watch resource")
watchStart := time.Now()
err := listAndWatchResource(ctx, log, resourceClient, gvr, observedResources, resourceC)
if err == nil {
continue
}

if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
if errors.Is(err, errUnexpectedObject) {
log.Error(err, "terminal resource watch failure")
return
}

// If the watch ran for a healthy period before failing (e.g. a normal
// watch expiration after minutes of successful operation), reset the
// backoff so the next retry starts quickly.
if time.Since(watchStart) >= maxRetryDelay {
backoff = newRetryBackoff()
}
Comment on lines +63 to +82
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Reset the backoff only after a healthy watch, not after a slow failing list.

watchStart is captured before listAndWatchResource(), so a List() that stalls for 30s+ and then errors will satisfy the "healthy period" check and drop the next retry back to the minimum delay. That weakens the new backoff exactly when the apiserver/network is unhealthy. Consider starting this timer only after client.Watch() succeeds, or have listAndWatchResource report whether it actually entered a steady watch phase before resetting.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/resourcewatch/observe/observe.go` around lines 63 - 82, The code resets
backoff based on watchStart which is set before calling listAndWatchResource, so
slow/failing List() calls can incorrectly count as a "healthy" watch; change the
logic so the healthy-period timer starts only after a successful Watch() (e.g.,
move the watchStart timestamp into listAndWatchResource at the point
client.Watch() succeeds) or modify listAndWatchResource to return an additional
boolean (e.g., enteredWatch bool) indicating it reached the steady watch phase,
then in the caller use enteredWatch and time.Since(watchStart) (or just the
boolean plus duration measured inside listAndWatchResource) to decide whether to
reset backoff (references: listAndWatchResource, watchStart, client.Watch,
maxRetryDelay, backoff, newRetryBackoff).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coderabbitai isn't our max delay 30s? If listAndWatchResource takes 30s to respond doesn't that prevent the hot looping the delay is guarding against?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@neisw — that's a fair point. If listAndWatchResource itself takes ≥ 30 s to fail (e.g. a slow List()), then that elapsed time already acts as the cooldown window, so resetting the backoff to 500 ms for the next attempt doesn't create hot-looping for that cycle.

The narrower concern that remains: if you get one slow (≥ 30 s) failing call that triggers the reset, followed immediately by many rapid-failing calls (e.g. the apiserver recovers connectivity but returns errors very quickly), those subsequent fast failures would start burning through the exponential ramp-up from the minimum again rather than from whatever level the backoff had reached. That said, this is a fairly unlikely compound scenario, and the dominant case (consistently slow or consistently fast failures) is handled well by the current logic.

Given that context, I'd consider this a minor concern rather than a blocking one. Happy to defer to your judgment on whether the added complexity of tracking whether the watch phase was actually entered is worth it.


var retryDelay time.Duration
if apierrors.IsNotFound(err) {
retryDelay = notFoundRetryDelay
} else {
retryDelay = backoff.Step()
}
log.Error(err, "failed to list and watch resource", "retryReason", retryReason(err), "retryDelay", retryDelay)

if !waitForRetry(ctx, retryDelay) {
return
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

func waitForRetry(ctx context.Context, delay time.Duration) bool {
timer := time.NewTimer(delay)
defer timer.Stop()

select {
case <-ctx.Done():
return false
case <-timer.C:
return true
}
}

func retryReason(err error) string {
switch {
case apierrors.IsNotFound(err):
return "listNotFound"
case errors.Is(err, errWatchClosed):
return "watchClosed"
case errors.Is(err, errWatchErrorEvent):
return "watchError"
case errors.Is(err, errUnexpectedObject):
return "decodeError"
default:
return "listOrWatchError"
}
}

func listAndWatchResource(ctx context.Context, log logr.Logger, client dynamic.NamespaceableResourceInterface, gvr schema.GroupVersionResource, observedResources map[types.UID]*resourceMeta, resourceC chan<- *ResourceObservation) error {
listResourceVersion, err := listResource(ctx, log, client, gvr, observedResources, resourceC)
if err != nil {
// List returns a NotFound error if the resource doesn't exist. We
// expect this to happen during cluster installation before CRDs are
// admitted. Poll at 5 second intervals if this happens to avoid
// spamming api-server or the logs.
// admitted.
if apierrors.IsNotFound(err) {
log.Info("Resource not found, polling")
time.Sleep(5 * time.Second)
return nil
log.Info("Resource not found")
}
return err
}
Expand All @@ -62,6 +140,7 @@ func listAndWatchResource(ctx context.Context, log logr.Logger, client dynamic.N
if err != nil {
return fmt.Errorf("failed to watch resource: %w", err)
}
defer resourceWatch.Stop()

resultChan := resourceWatch.ResultChan()
for {
Expand All @@ -70,23 +149,38 @@ func listAndWatchResource(ctx context.Context, log logr.Logger, client dynamic.N
return ctx.Err()
case observation, ok := <-resultChan:
if !ok {
log.Info("Resource watch closed")
return nil
// Watch channel closed (e.g. watch expired); caller will re-list with backoff.
log.Info("Watch channel closed, will retry")
return errWatchClosed
}

switch observation.Type {
case watch.Bookmark:
// Bookmarks are periodic progress notifications; no state change to emit.
continue
case watch.Error:
status, ok := observation.Object.(*metav1.Status)
if !ok {
return fmt.Errorf("%w: %T", errUnexpectedObject, observation.Object)
}
return fmt.Errorf("%w: reason=%s message=%s", errWatchErrorEvent, status.Reason, status.Message)
case watch.Added, watch.Modified, watch.Deleted:
// handled below
default:
log.Info("Unhandled watch event", "type", observation.Type)
continue
}

object, ok := observation.Object.(*unstructured.Unstructured)
if !ok {
return fmt.Errorf("failed to cast observation object to unstructured: %T", observation.Object)
return fmt.Errorf("%w: %T", errUnexpectedObject, observation.Object)
Comment on lines +161 to +176
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🌐 Web query:

In Go, if an interface holds a typed nil pointer like (*metav1.Status)(nil), does a type assertion to *metav1.Status return ok=true with a nil result?

💡 Result:

Yes, a type assertion to *metav1.Status on an interface holding (*metav1.Status)(nil) returns ok=true with a nil result.

Citations:


🏁 Script executed:

# First, let's look at the file structure and locate the file
find . -name "observe.go" -path "*/resourcewatch/observe/*" -type f

Repository: openshift/origin

Length of output: 98


🏁 Script executed:

# Once we find it, let's examine the context around lines 161-176
if [ -f "pkg/resourcewatch/observe/observe.go" ]; then
  wc -l pkg/resourcewatch/observe/observe.go
fi

Repository: openshift/origin

Length of output: 100


🏁 Script executed:

# Get the full context around the problematic lines
if [ -f "pkg/resourcewatch/observe/observe.go" ]; then
  sed -n '140,190p' pkg/resourcewatch/observe/observe.go
fi

Repository: openshift/origin

Length of output: 1711


🏁 Script executed:

# Let's also check what the watch package is - imports at the top
if [ -f "pkg/resourcewatch/observe/observe.go" ]; then
  head -30 pkg/resourcewatch/observe/observe.go
fi

Repository: openshift/origin

Length of output: 784


🏁 Script executed:

# Look for test files related to observe
find . -name "*observe*test*.go" -o -name "*test*observe*.go" | head -20

Repository: openshift/origin

Length of output: 103


🏁 Script executed:

# Check if there are any tests that might show edge cases with nil objects
if [ -f "pkg/resourcewatch/observe/observe_test.go" ]; then
  cat -n pkg/resourcewatch/observe/observe_test.go | head -150
fi

Repository: openshift/origin

Length of output: 5707


🏁 Script executed:

# Search for any existing nil checks or similar defensive code in the codebase
rg "observation\.Object" pkg/resourcewatch/observe/ -A 3 -B 1

Repository: openshift/origin

Length of output: 1087


🏁 Script executed:

# Check if there are any comments or documentation about watch event handling
rg -A 5 "watch.Error\|watch.Event" pkg/resourcewatch/observe/observe.go

Repository: openshift/origin

Length of output: 42


Guard typed-nil watch objects before dereferencing them.

In Go, a type assertion on an interface holding a typed nil pointer like (*metav1.Status)(nil) will return ok=true with a nil result. The current code checks only ok but not whether the asserted value is nil. If observation.Object contains a typed nil, accessing fields like status.Reason or object properties would panic. Add nil checks after each successful type assertion:

Proposed fix
 			case watch.Error:
 				status, ok := observation.Object.(*metav1.Status)
-				if !ok {
+				if !ok || status == nil {
 					return fmt.Errorf("%w: %T", errUnexpectedObject, observation.Object)
 				}
 				return fmt.Errorf("%w: reason=%s message=%s", errWatchErrorEvent, status.Reason, status.Message)
 			case watch.Added, watch.Modified, watch.Deleted:
 				// handled below
 			default:
 				log.Info("Unhandled watch event", "type", observation.Type)
 				continue
 			}
 
 			object, ok := observation.Object.(*unstructured.Unstructured)
-			if !ok {
+			if !ok || object == nil {
 				return fmt.Errorf("%w: %T", errUnexpectedObject, observation.Object)
 			}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
case watch.Error:
status, ok := observation.Object.(*metav1.Status)
if !ok {
return fmt.Errorf("%w: %T", errUnexpectedObject, observation.Object)
}
return fmt.Errorf("%w: reason=%s message=%s", errWatchErrorEvent, status.Reason, status.Message)
case watch.Added, watch.Modified, watch.Deleted:
// handled below
default:
log.Info("Unhandled watch event", "type", observation.Type)
continue
}
object, ok := observation.Object.(*unstructured.Unstructured)
if !ok {
return fmt.Errorf("failed to cast observation object to unstructured: %T", observation.Object)
return fmt.Errorf("%w: %T", errUnexpectedObject, observation.Object)
case watch.Error:
status, ok := observation.Object.(*metav1.Status)
if !ok || status == nil {
return fmt.Errorf("%w: %T", errUnexpectedObject, observation.Object)
}
return fmt.Errorf("%w: reason=%s message=%s", errWatchErrorEvent, status.Reason, status.Message)
case watch.Added, watch.Modified, watch.Deleted:
// handled below
default:
log.Info("Unhandled watch event", "type", observation.Type)
continue
}
object, ok := observation.Object.(*unstructured.Unstructured)
if !ok || object == nil {
return fmt.Errorf("%w: %T", errUnexpectedObject, observation.Object)
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/resourcewatch/observe/observe.go` around lines 161 - 176, The type
assertions for metav1.Status and unstructured.Unstructured can succeed for
typed-nil values; after asserting observation.Object to (*metav1.Status) and
(*unstructured.Unstructured) (variables status and object) you must also check
that the resulting pointer is not nil before dereferencing; if nil, return
fmt.Errorf("%w: %T", errUnexpectedObject, observation.Object) (or similar)
instead of proceeding to access status.Reason/status.Message or object fields,
ensuring both the watch.Error branch and the Added/Modified/Deleted handling
guard against typed-nil objects.

}

switch observation.Type {
case watch.Added:
case watch.Modified:
case watch.Added, watch.Modified:
emitUpdate(observedResources, gvr, object, resourceC)
case watch.Deleted:
emitDelete(observedResources, gvr, object, resourceC)
default:
log.Info("Unhandled watch event", "type", observation.Type)
}
}
}
Expand Down
Loading