Skip to content

Commit fa3e9f9

Browse files
committed
chore(kafka_schema): remove status from control flow
1 parent d470438 commit fa3e9f9

2 files changed

Lines changed: 278 additions & 202 deletions

File tree

controllers/kafkaschema_controller.go

Lines changed: 121 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ package controllers
44

55
import (
66
"context"
7+
"crypto/sha256"
8+
"encoding/hex"
9+
"encoding/json"
710
"errors"
811
"fmt"
912
"sort"
@@ -26,6 +29,10 @@ import (
2629
"github.com/aiven/aiven-operator/api/v1alpha1"
2730
)
2831

32+
// kafkaSchemaAppliedFingerprintAnnotation stores a hash of the last
33+
// schema body + resolved references + compatibility level.
34+
const kafkaSchemaAppliedFingerprintAnnotation = "controllers.aiven.io/kafka-schema-applied"
35+
2936
// kafkaSchemaRefIndex is the cache index key for finding KafkaSchemas that
3037
// reference another KafkaSchema by name.
3138
const kafkaSchemaRefIndex = "spec.references.kafkaSchemaRef.name"
@@ -119,12 +126,14 @@ func kafkaSchemaVersionChangedPredicate() predicate.Predicate {
119126
}
120127
}
121128

129+
// Observe decides whether the registry already serves what the spec describes.
130+
// Drift detection is driven by an annotation fingerprint of the last applied schema.
122131
func (r *KafkaSchemaController) Observe(ctx context.Context, schema *v1alpha1.KafkaSchema) (Observation, error) {
123132
if _, err := getServiceIfOperational(ctx, r.avnGen, schema.Spec.Project, schema.Spec.ServiceName); err != nil {
124133
return Observation{}, err
125134
}
126135

127-
versions, err := r.avnGen.ServiceSchemaRegistrySubjectVersionsGet(
136+
_, err := r.avnGen.ServiceSchemaRegistrySubjectVersionsGet(
128137
ctx,
129138
schema.Spec.Project,
130139
schema.Spec.ServiceName,
@@ -135,61 +144,39 @@ func (r *KafkaSchemaController) Observe(ctx context.Context, schema *v1alpha1.Ka
135144
// The service is operational but the schema registry may not yet be ready.
136145
return Observation{}, fmt.Errorf("%w: schema registry not ready", errPreconditionNotMet)
137146
case isNotFound(err):
138-
// Subject is not registered yet
147+
// Subject is not registered yet.
139148
return Observation{ResourceExists: false}, nil
140149
case err != nil:
141150
return Observation{}, fmt.Errorf("listing Kafka Schema versions: %w", err)
142151
}
143152

144-
if schema.Status.ID == 0 {
145-
// No ID tracked yet, fall through to Create; it is idempotent.
146-
return Observation{ResourceExists: false}, nil
147-
}
148-
149-
for _, v := range versions {
150-
got, err := r.avnGen.ServiceSchemaRegistrySubjectVersionGet(
151-
ctx,
152-
schema.Spec.Project,
153-
schema.Spec.ServiceName,
154-
schema.Spec.SubjectName,
155-
v,
156-
)
157-
if err != nil {
158-
return Observation{}, fmt.Errorf("getting Kafka Schema version %d: %w", v, err)
159-
}
160-
161-
if got.Id != schema.Status.ID {
162-
continue
163-
}
164-
165-
schema.Status.Version = got.Version
166-
167-
// A kafkaSchemaRef referent can advance without spec changing.
168-
// Compare what spec.references currently resolves to.
169-
desired, err := r.resolveReferences(ctx, schema)
153+
var resolvedRefs []kafkaschemaregistry.ReferenceIn
154+
if len(schema.Spec.References) > 0 {
155+
refs, err := r.resolveReferences(ctx, schema)
170156
switch {
171157
case errors.Is(err, errPreconditionNotMet):
172158
// Referent exists but its Status.Version is still 0 — soft-requeue.
173159
return Observation{}, err
174160
case err != nil:
175-
return Observation{}, fmt.Errorf("resolving desired references: %w", err)
161+
return Observation{}, fmt.Errorf("resolving references: %w", err)
176162
}
177-
if !referencesEqual(desired, got.References) {
178-
return Observation{ResourceExists: true, ResourceUpToDate: false}, nil
179-
}
180-
181-
meta.SetStatusCondition(&schema.Status.Conditions,
182-
getRunningCondition(metav1.ConditionTrue, "CheckRunning", "Instance is running on Aiven side"))
183-
metav1.SetMetaDataAnnotation(&schema.ObjectMeta, instanceIsRunningAnnotation, "true")
163+
resolvedRefs = refs
164+
}
165+
desiredFP := fingerprintSchema(schema, resolvedRefs)
184166

185-
return Observation{
186-
ResourceExists: true,
187-
ResourceUpToDate: hasLatestGeneration(schema),
188-
}, nil
167+
appliedFP := schema.GetAnnotations()[kafkaSchemaAppliedFingerprintAnnotation]
168+
if appliedFP != desiredFP {
169+
return Observation{ResourceExists: true, ResourceUpToDate: false}, nil
189170
}
190171

191-
// Tracked version is not visible yet.
192-
return Observation{}, fmt.Errorf("%w: tracked schema ID %d not visible in registry", errPreconditionNotMet, schema.Status.ID)
172+
meta.SetStatusCondition(&schema.Status.Conditions,
173+
getRunningCondition(metav1.ConditionTrue, "CheckRunning", "Instance is running on Aiven side"))
174+
metav1.SetMetaDataAnnotation(&schema.ObjectMeta, instanceIsRunningAnnotation, "true")
175+
176+
return Observation{
177+
ResourceExists: true,
178+
ResourceUpToDate: hasLatestGeneration(schema),
179+
}, nil
193180
}
194181

195182
func (r *KafkaSchemaController) Create(ctx context.Context, schema *v1alpha1.KafkaSchema) (CreateResult, error) {
@@ -233,11 +220,13 @@ func (r *KafkaSchemaController) applySchema(ctx context.Context, schema *v1alpha
233220
SchemaType: schema.Spec.SchemaType,
234221
}
235222

223+
var resolvedRefs []kafkaschemaregistry.ReferenceIn
236224
if len(schema.Spec.References) > 0 {
237225
refs, err := r.resolveReferences(ctx, schema)
238226
if err != nil {
239227
return err
240228
}
229+
resolvedRefs = refs
241230
postIn.References = &refs
242231
}
243232

@@ -252,13 +241,12 @@ func (r *KafkaSchemaController) applySchema(ctx context.Context, schema *v1alpha
252241
return fmt.Errorf("cannot add Kafka Schema Subject: %w", err)
253242
}
254243

255-
// ID is used by Observe to look up the version, which may take some time to appear.
256-
schema.Status.ID = schemaID
257-
258-
// TODO: workaround for a stale-cache race in the managed. Remove once the reconciler fix lands.
259-
if err := r.persistStatusID(ctx, schema); err != nil {
260-
return fmt.Errorf("persisting Status.ID: %w", err)
244+
version, err := r.lookupVersionForID(ctx, schema, schemaID)
245+
if err != nil {
246+
return fmt.Errorf("locating version for schema ID %d: %w", schemaID, err)
261247
}
248+
schema.Status.ID = schemaID
249+
schema.Status.Version = version
262250

263251
if schema.Spec.CompatibilityLevel != "" {
264252
if _, err := r.avnGen.ServiceSchemaRegistrySubjectConfigPut(
@@ -272,9 +260,94 @@ func (r *KafkaSchemaController) applySchema(ctx context.Context, schema *v1alpha
272260
}
273261
}
274262

263+
metav1.SetMetaDataAnnotation(
264+
&schema.ObjectMeta,
265+
kafkaSchemaAppliedFingerprintAnnotation,
266+
fingerprintSchema(schema, resolvedRefs),
267+
)
268+
269+
// TODO: workaround for the stale-status race in the managed reconciler.
270+
// Remove once updateStatus stops clobbering concurrently-written status fields.
271+
if err := r.persistAppliedStatus(ctx, schema); err != nil {
272+
return fmt.Errorf("persisting Status: %w", err)
273+
}
274+
275275
return nil
276276
}
277277

278+
// lookupVersionForID returns the registry version holding the given schema id.
279+
func (r *KafkaSchemaController) lookupVersionForID(
280+
ctx context.Context, schema *v1alpha1.KafkaSchema, id int,
281+
) (int, error) {
282+
versions, err := r.avnGen.ServiceSchemaRegistrySubjectVersionsGet(
283+
ctx,
284+
schema.Spec.Project,
285+
schema.Spec.ServiceName,
286+
schema.Spec.SubjectName,
287+
)
288+
if err != nil {
289+
return 0, fmt.Errorf("listing Kafka Schema versions: %w", err)
290+
}
291+
292+
sort.Slice(versions, func(i, j int) bool { return versions[i] > versions[j] })
293+
for _, v := range versions {
294+
got, err := r.avnGen.ServiceSchemaRegistrySubjectVersionGet(
295+
ctx,
296+
schema.Spec.Project,
297+
schema.Spec.ServiceName,
298+
schema.Spec.SubjectName,
299+
v,
300+
)
301+
if err != nil {
302+
return 0, fmt.Errorf("getting Kafka Schema version %d: %w", v, err)
303+
}
304+
if got.Id == id {
305+
return got.Version, nil
306+
}
307+
}
308+
309+
return 0, fmt.Errorf("%w: schema ID %d not visible in registry yet", errPreconditionNotMet, id)
310+
}
311+
312+
// persistAppliedStatus writes Status.ID and Status.Version in their own status-subresource update.
313+
// TODO: workaround for the stale-status race
314+
func (r *KafkaSchemaController) persistAppliedStatus(ctx context.Context, schema *v1alpha1.KafkaSchema) error {
315+
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
316+
latest := &v1alpha1.KafkaSchema{}
317+
if err := r.Get(ctx, client.ObjectKeyFromObject(schema), latest); err != nil {
318+
return err
319+
}
320+
latest.Status.ID = schema.Status.ID
321+
latest.Status.Version = schema.Status.Version
322+
if latest.Status.Conditions == nil {
323+
latest.Status.Conditions = []metav1.Condition{}
324+
}
325+
return r.Status().Update(ctx, latest)
326+
})
327+
}
328+
329+
// fingerprintSchema returns a stable hash of the provided schema.
330+
func fingerprintSchema(schema *v1alpha1.KafkaSchema, refs []kafkaschemaregistry.ReferenceIn) string {
331+
sorted := append([]kafkaschemaregistry.ReferenceIn(nil), refs...)
332+
sort.Slice(sorted, func(i, j int) bool { return sorted[i].Name < sorted[j].Name })
333+
334+
payload := struct {
335+
Schema string `json:"schema"`
336+
SchemaType kafkaschemaregistry.SchemaType `json:"schemaType"`
337+
CompatibilityLevel kafkaschemaregistry.CompatibilityType `json:"compatibilityLevel,omitempty"`
338+
References []kafkaschemaregistry.ReferenceIn `json:"references"`
339+
}{
340+
Schema: schema.Spec.Schema,
341+
SchemaType: schema.Spec.SchemaType,
342+
CompatibilityLevel: schema.Spec.CompatibilityLevel,
343+
References: sorted,
344+
}
345+
346+
buf, _ := json.Marshal(payload)
347+
sum := sha256.Sum256(buf)
348+
return hex.EncodeToString(sum[:])
349+
}
350+
278351
// resolveReferences turns Spec.References into the ReferenceIn slice.
279352
func (r *KafkaSchemaController) resolveReferences(
280353
ctx context.Context,
@@ -312,53 +385,6 @@ func (r *KafkaSchemaController) resolveReferences(
312385
return refs, nil
313386
}
314387

315-
// referencesEqual compares desired vs. registry references by name (path / $ref key).
316-
// Order is ignored. Names are enforced to be unique per KafkaSchema.
317-
func referencesEqual(desired []kafkaschemaregistry.ReferenceIn, got []kafkaschemaregistry.ReferenceOut) bool {
318-
if len(desired) != len(got) {
319-
return false
320-
}
321-
322-
byName := make(map[string]kafkaschemaregistry.ReferenceOut, len(got))
323-
for _, r := range got {
324-
byName[r.Name] = r
325-
}
326-
327-
// If the registry ever returned duplicate Names.
328-
if len(byName) != len(got) {
329-
return false
330-
}
331-
332-
for _, d := range desired {
333-
g, ok := byName[d.Name]
334-
if !ok || g.Subject != d.Subject || g.Version != d.Version {
335-
return false
336-
}
337-
}
338-
339-
return true
340-
}
341-
342-
// persistStatusID writes schema.Status.ID to the API server in its own status
343-
// subresource update, retrying on optimistic-concurrency conflicts.
344-
//
345-
// This is a workaround for the reconciler race issue.
346-
// It should be removed when updateStatus in the managed reconciler no longer clobbers
347-
// concurrently-written status fields.
348-
func (r *KafkaSchemaController) persistStatusID(ctx context.Context, schema *v1alpha1.KafkaSchema) error {
349-
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
350-
latest := &v1alpha1.KafkaSchema{}
351-
if err := r.Get(ctx, client.ObjectKeyFromObject(schema), latest); err != nil {
352-
return err
353-
}
354-
latest.Status.ID = schema.Status.ID
355-
if latest.Status.Conditions == nil {
356-
latest.Status.Conditions = []metav1.Condition{}
357-
}
358-
return r.Status().Update(ctx, latest)
359-
})
360-
}
361-
362388
func (r *KafkaSchemaController) Delete(ctx context.Context, schema *v1alpha1.KafkaSchema) error {
363389
// Block delete if any KafkaSchema in this namespace still imports us via kafkaSchemaRef.
364390
// Only catches kafkaSchemaRef dependents in the same namespace.

0 commit comments

Comments
 (0)