Skip to content

Commit b1b4b14

Browse files
committed
chore(kafka_schema): remove status from control flow
1 parent d470438 commit b1b4b14

2 files changed

Lines changed: 159 additions & 203 deletions

File tree

controllers/kafkaschema_controller.go

Lines changed: 64 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ package controllers
44

55
import (
66
"context"
7+
"crypto/sha256"
8+
"encoding/hex"
9+
"encoding/json"
710
"errors"
811
"fmt"
912
"sort"
@@ -14,7 +17,6 @@ import (
1417
apierrors "k8s.io/apimachinery/pkg/api/errors"
1518
"k8s.io/apimachinery/pkg/api/meta"
1619
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17-
"k8s.io/client-go/util/retry"
1820
ctrl "sigs.k8s.io/controller-runtime"
1921
"sigs.k8s.io/controller-runtime/pkg/builder"
2022
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -26,6 +28,10 @@ import (
2628
"github.com/aiven/aiven-operator/api/v1alpha1"
2729
)
2830

31+
// kafkaSchemaAppliedFingerprintAnnotation stores a hash of the last
32+
// schema body + resolved references + compatibility level.
33+
const kafkaSchemaAppliedFingerprintAnnotation = "controllers.aiven.io/kafka-schema-applied"
34+
2935
// kafkaSchemaRefIndex is the cache index key for finding KafkaSchemas that
3036
// reference another KafkaSchema by name.
3137
const kafkaSchemaRefIndex = "spec.references.kafkaSchemaRef.name"
@@ -119,6 +125,8 @@ func kafkaSchemaVersionChangedPredicate() predicate.Predicate {
119125
}
120126
}
121127

128+
// Observe decides whether the registry already serves what the spec describes.
129+
// Drift detection is driven by an annotation fingerprint of the last applied schema.
122130
func (r *KafkaSchemaController) Observe(ctx context.Context, schema *v1alpha1.KafkaSchema) (Observation, error) {
123131
if _, err := getServiceIfOperational(ctx, r.avnGen, schema.Spec.Project, schema.Spec.ServiceName); err != nil {
124132
return Observation{}, err
@@ -135,61 +143,44 @@ func (r *KafkaSchemaController) Observe(ctx context.Context, schema *v1alpha1.Ka
135143
// The service is operational but the schema registry may not yet be ready.
136144
return Observation{}, fmt.Errorf("%w: schema registry not ready", errPreconditionNotMet)
137145
case isNotFound(err):
138-
// Subject is not registered yet
146+
// Subject is not registered yet.
139147
return Observation{ResourceExists: false}, nil
140148
case err != nil:
141149
return Observation{}, fmt.Errorf("listing Kafka Schema versions: %w", err)
142150
}
143151

144-
if schema.Status.ID == 0 {
145-
// No ID tracked yet, fall through to Create; it is idempotent.
146-
return Observation{ResourceExists: false}, nil
147-
}
148-
149-
for _, v := range versions {
150-
got, err := r.avnGen.ServiceSchemaRegistrySubjectVersionGet(
151-
ctx,
152-
schema.Spec.Project,
153-
schema.Spec.ServiceName,
154-
schema.Spec.SubjectName,
155-
v,
156-
)
157-
if err != nil {
158-
return Observation{}, fmt.Errorf("getting Kafka Schema version %d: %w", v, err)
159-
}
160-
161-
if got.Id != schema.Status.ID {
162-
continue
163-
}
164-
165-
schema.Status.Version = got.Version
166-
167-
// A kafkaSchemaRef referent can advance without spec changing.
168-
// Compare what spec.references currently resolves to.
169-
desired, err := r.resolveReferences(ctx, schema)
152+
var resolvedRefs []kafkaschemaregistry.ReferenceIn
153+
if len(schema.Spec.References) > 0 {
154+
refs, err := r.resolveReferences(ctx, schema)
170155
switch {
171156
case errors.Is(err, errPreconditionNotMet):
172157
// Referent exists but its Status.Version is still 0 — soft-requeue.
173158
return Observation{}, err
174159
case err != nil:
175-
return Observation{}, fmt.Errorf("resolving desired references: %w", err)
176-
}
177-
if !referencesEqual(desired, got.References) {
178-
return Observation{ResourceExists: true, ResourceUpToDate: false}, nil
160+
return Observation{}, fmt.Errorf("resolving references: %w", err)
179161
}
162+
resolvedRefs = refs
163+
}
164+
desiredFP := fingerprintSchema(schema, resolvedRefs)
180165

181-
meta.SetStatusCondition(&schema.Status.Conditions,
182-
getRunningCondition(metav1.ConditionTrue, "CheckRunning", "Instance is running on Aiven side"))
183-
metav1.SetMetaDataAnnotation(&schema.ObjectMeta, instanceIsRunningAnnotation, "true")
166+
appliedFP := schema.GetAnnotations()[kafkaSchemaAppliedFingerprintAnnotation]
167+
if appliedFP != desiredFP {
168+
return Observation{ResourceExists: true, ResourceUpToDate: false}, nil
169+
}
184170

185-
return Observation{
186-
ResourceExists: true,
187-
ResourceUpToDate: hasLatestGeneration(schema),
188-
}, nil
171+
if len(versions) > 0 {
172+
sort.Slice(versions, func(i, j int) bool { return versions[i] > versions[j] })
173+
schema.Status.Version = versions[0]
189174
}
190175

191-
// Tracked version is not visible yet.
192-
return Observation{}, fmt.Errorf("%w: tracked schema ID %d not visible in registry", errPreconditionNotMet, schema.Status.ID)
176+
meta.SetStatusCondition(&schema.Status.Conditions,
177+
getRunningCondition(metav1.ConditionTrue, "CheckRunning", "Instance is running on Aiven side"))
178+
metav1.SetMetaDataAnnotation(&schema.ObjectMeta, instanceIsRunningAnnotation, "true")
179+
180+
return Observation{
181+
ResourceExists: true,
182+
ResourceUpToDate: hasLatestGeneration(schema),
183+
}, nil
193184
}
194185

195186
func (r *KafkaSchemaController) Create(ctx context.Context, schema *v1alpha1.KafkaSchema) (CreateResult, error) {
@@ -233,11 +224,13 @@ func (r *KafkaSchemaController) applySchema(ctx context.Context, schema *v1alpha
233224
SchemaType: schema.Spec.SchemaType,
234225
}
235226

227+
var resolvedRefs []kafkaschemaregistry.ReferenceIn
236228
if len(schema.Spec.References) > 0 {
237229
refs, err := r.resolveReferences(ctx, schema)
238230
if err != nil {
239231
return err
240232
}
233+
resolvedRefs = refs
241234
postIn.References = &refs
242235
}
243236

@@ -252,14 +245,10 @@ func (r *KafkaSchemaController) applySchema(ctx context.Context, schema *v1alpha
252245
return fmt.Errorf("cannot add Kafka Schema Subject: %w", err)
253246
}
254247

255-
// ID is used by Observe to look up the version, which may take some time to appear.
248+
// Status.ID is informational only — Observe never reads it. Drift is
249+
// detected via the annotation fingerprint written below.
256250
schema.Status.ID = schemaID
257251

258-
// TODO: workaround for a stale-cache race in the managed. Remove once the reconciler fix lands.
259-
if err := r.persistStatusID(ctx, schema); err != nil {
260-
return fmt.Errorf("persisting Status.ID: %w", err)
261-
}
262-
263252
if schema.Spec.CompatibilityLevel != "" {
264253
if _, err := r.avnGen.ServiceSchemaRegistrySubjectConfigPut(
265254
ctx,
@@ -272,9 +261,37 @@ func (r *KafkaSchemaController) applySchema(ctx context.Context, schema *v1alpha
272261
}
273262
}
274263

264+
metav1.SetMetaDataAnnotation(
265+
&schema.ObjectMeta,
266+
kafkaSchemaAppliedFingerprintAnnotation,
267+
fingerprintSchema(schema, resolvedRefs),
268+
)
269+
275270
return nil
276271
}
277272

273+
// fingerprintSchema returns a stable hash of the provided schema.
274+
func fingerprintSchema(schema *v1alpha1.KafkaSchema, refs []kafkaschemaregistry.ReferenceIn) string {
275+
sorted := append([]kafkaschemaregistry.ReferenceIn(nil), refs...)
276+
sort.Slice(sorted, func(i, j int) bool { return sorted[i].Name < sorted[j].Name })
277+
278+
payload := struct {
279+
Schema string `json:"schema"`
280+
SchemaType kafkaschemaregistry.SchemaType `json:"schemaType"`
281+
CompatibilityLevel kafkaschemaregistry.CompatibilityType `json:"compatibilityLevel,omitempty"`
282+
References []kafkaschemaregistry.ReferenceIn `json:"references"`
283+
}{
284+
Schema: schema.Spec.Schema,
285+
SchemaType: schema.Spec.SchemaType,
286+
CompatibilityLevel: schema.Spec.CompatibilityLevel,
287+
References: sorted,
288+
}
289+
290+
buf, _ := json.Marshal(payload)
291+
sum := sha256.Sum256(buf)
292+
return hex.EncodeToString(sum[:])
293+
}
294+
278295
// resolveReferences turns Spec.References into the ReferenceIn slice.
279296
func (r *KafkaSchemaController) resolveReferences(
280297
ctx context.Context,
@@ -312,53 +329,6 @@ func (r *KafkaSchemaController) resolveReferences(
312329
return refs, nil
313330
}
314331

315-
// referencesEqual compares desired vs. registry references by name (path / $ref key).
316-
// Order is ignored. Names are enforced to be unique per KafkaSchema.
317-
func referencesEqual(desired []kafkaschemaregistry.ReferenceIn, got []kafkaschemaregistry.ReferenceOut) bool {
318-
if len(desired) != len(got) {
319-
return false
320-
}
321-
322-
byName := make(map[string]kafkaschemaregistry.ReferenceOut, len(got))
323-
for _, r := range got {
324-
byName[r.Name] = r
325-
}
326-
327-
// If the registry ever returned duplicate Names.
328-
if len(byName) != len(got) {
329-
return false
330-
}
331-
332-
for _, d := range desired {
333-
g, ok := byName[d.Name]
334-
if !ok || g.Subject != d.Subject || g.Version != d.Version {
335-
return false
336-
}
337-
}
338-
339-
return true
340-
}
341-
342-
// persistStatusID writes schema.Status.ID to the API server in its own status
343-
// subresource update, retrying on optimistic-concurrency conflicts.
344-
//
345-
// This is a workaround for the reconciler race issue.
346-
// It should be removed when updateStatus in the managed reconciler no longer clobbers
347-
// concurrently-written status fields.
348-
func (r *KafkaSchemaController) persistStatusID(ctx context.Context, schema *v1alpha1.KafkaSchema) error {
349-
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
350-
latest := &v1alpha1.KafkaSchema{}
351-
if err := r.Get(ctx, client.ObjectKeyFromObject(schema), latest); err != nil {
352-
return err
353-
}
354-
latest.Status.ID = schema.Status.ID
355-
if latest.Status.Conditions == nil {
356-
latest.Status.Conditions = []metav1.Condition{}
357-
}
358-
return r.Status().Update(ctx, latest)
359-
})
360-
}
361-
362332
func (r *KafkaSchemaController) Delete(ctx context.Context, schema *v1alpha1.KafkaSchema) error {
363333
// Block delete if any KafkaSchema in this namespace still imports us via kafkaSchemaRef.
364334
// Only catches kafkaSchemaRef dependents in the same namespace.

0 commit comments

Comments
 (0)