Skip to content

Commit b5b0649

Browse files
committed
feat(upgradepipeline): avoid stale object updates in status sync
1 parent 86961eb commit b5b0649

3 files changed

Lines changed: 162 additions & 99 deletions

File tree

controllers/kafkaschema_controller.go

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
apierrors "k8s.io/apimachinery/pkg/api/errors"
1515
"k8s.io/apimachinery/pkg/api/meta"
1616
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17-
"k8s.io/client-go/util/retry"
1817
ctrl "sigs.k8s.io/controller-runtime"
1918
"sigs.k8s.io/controller-runtime/pkg/builder"
2019
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -255,11 +254,6 @@ func (r *KafkaSchemaController) applySchema(ctx context.Context, schema *v1alpha
255254
// ID is used by Observe to look up the version, which may take some time to appear.
256255
schema.Status.ID = schemaID
257256

258-
// TODO: workaround for a stale-cache race in the managed. Remove once the reconciler fix lands.
259-
if err := r.persistStatusID(ctx, schema); err != nil {
260-
return fmt.Errorf("persisting Status.ID: %w", err)
261-
}
262-
263257
if schema.Spec.CompatibilityLevel != "" {
264258
if _, err := r.avnGen.ServiceSchemaRegistrySubjectConfigPut(
265259
ctx,
@@ -339,26 +333,6 @@ func referencesEqual(desired []kafkaschemaregistry.ReferenceIn, got []kafkaschem
339333
return true
340334
}
341335

342-
// persistStatusID writes schema.Status.ID to the API server in its own status
343-
// subresource update, retrying on optimistic-concurrency conflicts.
344-
//
345-
// This is a workaround for the reconciler race issue.
346-
// It should be removed when updateStatus in the managed reconciler no longer clobbers
347-
// concurrently-written status fields.
348-
func (r *KafkaSchemaController) persistStatusID(ctx context.Context, schema *v1alpha1.KafkaSchema) error {
349-
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
350-
latest := &v1alpha1.KafkaSchema{}
351-
if err := r.Get(ctx, client.ObjectKeyFromObject(schema), latest); err != nil {
352-
return err
353-
}
354-
latest.Status.ID = schema.Status.ID
355-
if latest.Status.Conditions == nil {
356-
latest.Status.Conditions = []metav1.Condition{}
357-
}
358-
return r.Status().Update(ctx, latest)
359-
})
360-
}
361-
362336
func (r *KafkaSchemaController) Delete(ctx context.Context, schema *v1alpha1.KafkaSchema) error {
363337
// Block delete if any KafkaSchema in this namespace still imports us via kafkaSchemaRef.
364338
// Only catches kafkaSchemaRef dependents in the same namespace.

controllers/reconciler.go

Lines changed: 36 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package controllers
22

33
import (
44
"context"
5+
"encoding/json"
56
"errors"
67
"fmt"
78
"maps"
@@ -17,7 +18,6 @@ import (
1718
"k8s.io/apimachinery/pkg/api/meta"
1819
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1920
"k8s.io/apimachinery/pkg/types"
20-
"k8s.io/client-go/util/retry"
2121
ctrl "sigs.k8s.io/controller-runtime"
2222
"sigs.k8s.io/controller-runtime/pkg/builder"
2323
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -232,43 +232,51 @@ func (r *Reconciler[T]) resolveK8sRefs(ctx context.Context, obj T) (requeue bool
232232
return false, nil
233233
}
234234

235-
// updateStatus persists spec/metadata and status of obj.
236-
//
237-
// KNOWN ISSUE — stale-status race:
238-
// The Status().Update below sends obj.Status verbatim. If obj was built from
239-
// a stale informer-cache snapshot, this can clobber a status field that a
240-
// concurrent reconcile pass just persisted.
241-
//
242-
// For controllers built: do not use .status as an input to control flow. Treat .status as user-observable output only.
235+
// updateStatus persists status and controller-owned readiness annotations.
243236
func (r *Reconciler[T]) updateStatus(ctx context.Context, orig v1alpha1.AivenManagedObject, obj v1alpha1.AivenManagedObject) error {
244237
if equality.Semantic.DeepEqual(orig, obj) {
245238
return nil
246239
}
247240

248-
// Order matters.
249-
// First need to update the object, and then update the status.
250-
// So dependent resources won't see READY before it has been updated with new values
251-
252-
// Now we can update the status
253-
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
254-
// When updated, object status is vanished.
255-
// So we waste a copy for that,
256-
// while the original object must already have all the fields updated in runtime
257-
// Additionally, it gets the "latest version" to resolve optimistic concurrency control conflict
258-
latest := obj.DeepCopyObject().(client.Object)
259-
if err := r.Get(ctx, types.NamespacedName{Name: latest.GetName(), Namespace: latest.GetNamespace()}, latest); err != nil {
260-
return err
241+
// Capture annotation changes before Status().Update because it may mutate obj metadata.
242+
annotations := map[string]any{}
243+
for _, key := range []string{processedGenerationAnnotation, instanceIsRunningAnnotation} {
244+
origValue, origOk := orig.GetAnnotations()[key]
245+
value, ok := obj.GetAnnotations()[key]
246+
if origOk == ok && origValue == value {
247+
continue
261248
}
262249

263-
updated := obj.DeepCopyObject().(client.Object)
264-
updated.SetResourceVersion(latest.GetResourceVersion())
265-
if err := r.Update(ctx, updated); err != nil {
266-
return err
250+
if ok {
251+
annotations[key] = value
252+
} else {
253+
annotations[key] = nil
267254
}
255+
}
256+
257+
if err := r.Status().Update(ctx, obj); err != nil {
258+
return err
259+
}
260+
261+
if len(annotations) == 0 {
262+
return nil
263+
}
268264

269-
obj.SetResourceVersion(updated.GetResourceVersion())
270-
return r.Status().Update(ctx, obj)
265+
payload, err := json.Marshal(map[string]any{
266+
"metadata": map[string]any{
267+
"resourceVersion": obj.GetResourceVersion(),
268+
"annotations": annotations,
269+
},
271270
})
271+
if err != nil {
272+
return fmt.Errorf("marshalling managed annotations patch: %w", err)
273+
}
274+
275+
if err := r.Patch(ctx, obj, client.RawPatch(types.MergePatchType, payload)); err != nil {
276+
return err
277+
}
278+
279+
return nil
272280
}
273281

274282
func (r *Reconciler[T]) newAivenClient(ctx context.Context, obj T) (avngen.Client, error) {

0 commit comments

Comments
 (0)