Skip to content

Commit e44e6f7

Browse files
authored
fix(reconciler): avoid stale object updates in status sync (#1188)
1 parent d470438 commit e44e6f7

6 files changed

Lines changed: 619 additions & 112 deletions

File tree

controllers/kafkaschema_controller.go

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
apierrors "k8s.io/apimachinery/pkg/api/errors"
1515
"k8s.io/apimachinery/pkg/api/meta"
1616
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17-
"k8s.io/client-go/util/retry"
1817
ctrl "sigs.k8s.io/controller-runtime"
1918
"sigs.k8s.io/controller-runtime/pkg/builder"
2019
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -255,11 +254,6 @@ func (r *KafkaSchemaController) applySchema(ctx context.Context, schema *v1alpha
255254
// ID is used by Observe to look up the version, which may take some time to appear.
256255
schema.Status.ID = schemaID
257256

258-
// TODO: workaround for a stale-cache race in the managed. Remove once the reconciler fix lands.
259-
if err := r.persistStatusID(ctx, schema); err != nil {
260-
return fmt.Errorf("persisting Status.ID: %w", err)
261-
}
262-
263257
if schema.Spec.CompatibilityLevel != "" {
264258
if _, err := r.avnGen.ServiceSchemaRegistrySubjectConfigPut(
265259
ctx,
@@ -339,26 +333,6 @@ func referencesEqual(desired []kafkaschemaregistry.ReferenceIn, got []kafkaschem
339333
return true
340334
}
341335

342-
// persistStatusID writes schema.Status.ID to the API server in its own status
343-
// subresource update, retrying on optimistic-concurrency conflicts.
344-
//
345-
// This is a workaround for the reconciler race issue.
346-
// It should be removed when updateStatus in the managed reconciler no longer clobbers
347-
// concurrently-written status fields.
348-
func (r *KafkaSchemaController) persistStatusID(ctx context.Context, schema *v1alpha1.KafkaSchema) error {
349-
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
350-
latest := &v1alpha1.KafkaSchema{}
351-
if err := r.Get(ctx, client.ObjectKeyFromObject(schema), latest); err != nil {
352-
return err
353-
}
354-
latest.Status.ID = schema.Status.ID
355-
if latest.Status.Conditions == nil {
356-
latest.Status.Conditions = []metav1.Condition{}
357-
}
358-
return r.Status().Update(ctx, latest)
359-
})
360-
}
361-
362336
func (r *KafkaSchemaController) Delete(ctx context.Context, schema *v1alpha1.KafkaSchema) error {
363337
// Block delete if any KafkaSchema in this namespace still imports us via kafkaSchemaRef.
364338
// Only catches kafkaSchemaRef dependents in the same namespace.

controllers/reconciler.go

Lines changed: 52 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package controllers
22

33
import (
44
"context"
5+
"encoding/json"
56
"errors"
67
"fmt"
78
"maps"
@@ -17,7 +18,6 @@ import (
1718
"k8s.io/apimachinery/pkg/api/meta"
1819
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1920
"k8s.io/apimachinery/pkg/types"
20-
"k8s.io/client-go/util/retry"
2121
ctrl "sigs.k8s.io/controller-runtime"
2222
"sigs.k8s.io/controller-runtime/pkg/builder"
2323
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -87,6 +87,15 @@ func (r *Reconciler[T]) WithIndexes(fns ...func(context.Context, ctrl.Manager) e
8787
// requeueTimeout sets timeout to requeue controller
8888
const requeueTimeout = 10 * time.Second
8989

90+
type managedAnnotationsPatchPayload struct {
91+
Metadata managedAnnotationsPatchMetadata `json:"metadata"`
92+
}
93+
94+
type managedAnnotationsPatchMetadata struct {
95+
ResourceVersion string `json:"resourceVersion"`
96+
Annotations map[string]any `json:"annotations"`
97+
}
98+
9099
// Reconcile performs the full reconciliation loop for a managed resource.
91100
func (r *Reconciler[T]) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error) {
92101
obj := r.newObj()
@@ -120,7 +129,7 @@ func (r *Reconciler[T]) Reconcile(ctx context.Context, req ctrl.Request) (res ct
120129

121130
orig := obj.DeepCopyObject().(v1alpha1.AivenManagedObject)
122131
defer func() {
123-
err = errors.Join(err, r.updateStatus(ctx, orig, obj))
132+
err = errors.Join(err, r.persistReconcileState(ctx, orig, obj))
124133
}()
125134

126135
meta.SetStatusCondition(obj.Conditions(), getInitializedCondition("Preconditions", "Checking preconditions"))
@@ -232,43 +241,51 @@ func (r *Reconciler[T]) resolveK8sRefs(ctx context.Context, obj T) (requeue bool
232241
return false, nil
233242
}
234243

235-
// updateStatus persists spec/metadata and status of obj.
236-
//
237-
// KNOWN ISSUE — stale-status race:
238-
// The Status().Update below sends obj.Status verbatim. If obj was built from
239-
// a stale informer-cache snapshot, this can clobber a status field that a
240-
// concurrent reconcile pass just persisted.
241-
//
242-
// For controllers built: do not use .status as an input to control flow. Treat .status as user-observable output only.
243-
func (r *Reconciler[T]) updateStatus(ctx context.Context, orig v1alpha1.AivenManagedObject, obj v1alpha1.AivenManagedObject) error {
244+
// persistReconcileState persists status and controller-owned readiness annotations.
245+
func (r *Reconciler[T]) persistReconcileState(ctx context.Context, orig v1alpha1.AivenManagedObject, obj v1alpha1.AivenManagedObject) error {
244246
if equality.Semantic.DeepEqual(orig, obj) {
245247
return nil
246248
}
247249

248-
// Order matters.
249-
// First need to update the object, and then update the status.
250-
// So dependent resources won't see READY before it has been updated with new values
251-
252-
// Now we can update the status
253-
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
254-
// When updated, object status is vanished.
255-
// So we waste a copy for that,
256-
// while the original object must already have all the fields updated in runtime
257-
// Additionally, it gets the "latest version" to resolve optimistic concurrency control conflict
258-
latest := obj.DeepCopyObject().(client.Object)
259-
if err := r.Get(ctx, types.NamespacedName{Name: latest.GetName(), Namespace: latest.GetNamespace()}, latest); err != nil {
260-
return err
250+
// Capture annotation changes before Status().Update because it may mutate obj metadata.
251+
annotations := map[string]any{}
252+
for _, key := range []string{processedGenerationAnnotation, instanceIsRunningAnnotation} {
253+
origValue, origOk := orig.GetAnnotations()[key]
254+
value, ok := obj.GetAnnotations()[key]
255+
if origOk == ok && origValue == value {
256+
continue
261257
}
262258

263-
updated := obj.DeepCopyObject().(client.Object)
264-
updated.SetResourceVersion(latest.GetResourceVersion())
265-
if err := r.Update(ctx, updated); err != nil {
266-
return err
259+
if ok {
260+
annotations[key] = value
261+
} else {
262+
annotations[key] = nil
267263
}
264+
}
265+
266+
if err := r.Status().Update(ctx, obj); err != nil {
267+
return err
268+
}
269+
270+
if len(annotations) == 0 {
271+
return nil
272+
}
268273

269-
obj.SetResourceVersion(updated.GetResourceVersion())
270-
return r.Status().Update(ctx, obj)
274+
payload, err := json.Marshal(managedAnnotationsPatchPayload{
275+
Metadata: managedAnnotationsPatchMetadata{
276+
ResourceVersion: obj.GetResourceVersion(),
277+
Annotations: annotations,
278+
},
271279
})
280+
if err != nil {
281+
return fmt.Errorf("marshalling managed annotations patch: %w", err)
282+
}
283+
284+
if err := r.Patch(ctx, obj, client.RawPatch(types.MergePatchType, payload)); err != nil {
285+
return err
286+
}
287+
288+
return nil
272289
}
273290

274291
func (r *Reconciler[T]) newAivenClient(ctx context.Context, obj T) (avngen.Client, error) {
@@ -461,7 +478,7 @@ func (r *Reconciler[T]) reconcileDeletion(ctx context.Context, obj T) (ctrl.Resu
461478
meta.SetStatusCondition(obj.Conditions(), getErrorCondition(errConditionDelete, errors.New(msg)))
462479
return ctrl.Result{}, errors.Join(
463480
fmt.Errorf("unable to delete instance: %s", msg),
464-
r.updateStatus(ctx, orig, obj),
481+
r.persistReconcileState(ctx, orig, obj),
465482
)
466483
}
467484

@@ -484,7 +501,7 @@ func (r *Reconciler[T]) handleDeleteError(ctx context.Context, orig v1alpha1.Aiv
484501
// be retried once dependencies are removed, but do not surface this as a hard error.
485502
if errors.Is(err, v1alpha1.ErrDeleteDependencies) {
486503
logr.FromContextOrDiscard(ctx).Info("object has dependencies, requeue delete", "apiError", err)
487-
return ctrl.Result{RequeueAfter: requeueTimeout}, r.updateStatus(ctx, orig, obj)
504+
return ctrl.Result{RequeueAfter: requeueTimeout}, r.persistReconcileState(ctx, orig, obj)
488505
}
489506

490507
// If the deletion failed, don't remove the finalizer so that we can retry during the next reconciliation.
@@ -493,18 +510,18 @@ func (r *Reconciler[T]) handleDeleteError(ctx context.Context, orig v1alpha1.Aiv
493510
r.Recorder.Event(obj, corev1.EventTypeWarning, eventUnableToDeleteAtAiven, err.Error())
494511
return ctrl.Result{}, errors.Join(
495512
fmt.Errorf("unable to delete instance at Aiven: %w", err),
496-
r.updateStatus(ctx, orig, obj),
513+
r.persistReconcileState(ctx, orig, obj),
497514
)
498515
case isServerError(err):
499516
// If failed to delete due to a transient server error, keep the finalizer
500517
// and trigger a soft requeue so that deletion can be retried.
501518
logr.FromContextOrDiscard(ctx).Info("unable to delete instance at Aiven, will requeue delete", "apiError", err)
502-
return ctrl.Result{RequeueAfter: requeueTimeout}, r.updateStatus(ctx, orig, obj)
519+
return ctrl.Result{RequeueAfter: requeueTimeout}, r.persistReconcileState(ctx, orig, obj)
503520
default:
504521
r.Recorder.Event(obj, corev1.EventTypeWarning, eventUnableToDelete, err.Error())
505522
return ctrl.Result{}, errors.Join(
506523
fmt.Errorf("unable to delete instance: %w", err),
507-
r.updateStatus(ctx, orig, obj),
524+
r.persistReconcileState(ctx, orig, obj),
508525
)
509526
}
510527
}

0 commit comments

Comments
 (0)