Skip to content

Commit bd30f36

Browse files
authored
Merge 607a706 into e44e6f7
2 parents e44e6f7 + 607a706 commit bd30f36

2 files changed

Lines changed: 278 additions & 177 deletions

File tree

controllers/kafkaschema_controller.go

Lines changed: 121 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ package controllers
44

55
import (
66
"context"
7+
"crypto/sha256"
8+
"encoding/hex"
9+
"encoding/json"
710
"errors"
811
"fmt"
912
"sort"
@@ -25,6 +28,10 @@ import (
2528
"github.com/aiven/aiven-operator/api/v1alpha1"
2629
)
2730

31+
// kafkaSchemaAppliedFingerprintAnnotation stores a hash of the last
32+
// schema body + resolved references + compatibility level.
33+
const kafkaSchemaAppliedFingerprintAnnotation = "controllers.aiven.io/kafka-schema-applied"
34+
2835
// kafkaSchemaRefIndex is the cache index key for finding KafkaSchemas that
2936
// reference another KafkaSchema by name.
3037
const kafkaSchemaRefIndex = "spec.references.kafkaSchemaRef.name"
@@ -118,12 +125,14 @@ func kafkaSchemaVersionChangedPredicate() predicate.Predicate {
118125
}
119126
}
120127

128+
// Observe decides whether the registry already serves what the spec describes.
129+
// Drift detection is driven by an annotation fingerprint of the last applied schema.
121130
func (r *KafkaSchemaController) Observe(ctx context.Context, schema *v1alpha1.KafkaSchema) (Observation, error) {
122131
if _, err := getServiceIfOperational(ctx, r.avnGen, schema.Spec.Project, schema.Spec.ServiceName); err != nil {
123132
return Observation{}, err
124133
}
125134

126-
versions, err := r.avnGen.ServiceSchemaRegistrySubjectVersionsGet(
135+
_, err := r.avnGen.ServiceSchemaRegistrySubjectVersionsGet(
127136
ctx,
128137
schema.Spec.Project,
129138
schema.Spec.ServiceName,
@@ -134,61 +143,39 @@ func (r *KafkaSchemaController) Observe(ctx context.Context, schema *v1alpha1.Ka
134143
// The service is operational but the schema registry may not yet be ready.
135144
return Observation{}, fmt.Errorf("%w: schema registry not ready", errPreconditionNotMet)
136145
case isNotFound(err):
137-
// Subject is not registered yet
146+
// Subject is not registered yet.
138147
return Observation{ResourceExists: false}, nil
139148
case err != nil:
140149
return Observation{}, fmt.Errorf("listing Kafka Schema versions: %w", err)
141150
}
142151

143-
if schema.Status.ID == 0 {
144-
// No ID tracked yet, fall through to Create; it is idempotent.
145-
return Observation{ResourceExists: false}, nil
146-
}
147-
148-
for _, v := range versions {
149-
got, err := r.avnGen.ServiceSchemaRegistrySubjectVersionGet(
150-
ctx,
151-
schema.Spec.Project,
152-
schema.Spec.ServiceName,
153-
schema.Spec.SubjectName,
154-
v,
155-
)
156-
if err != nil {
157-
return Observation{}, fmt.Errorf("getting Kafka Schema version %d: %w", v, err)
158-
}
159-
160-
if got.Id != schema.Status.ID {
161-
continue
162-
}
163-
164-
schema.Status.Version = got.Version
165-
166-
// A kafkaSchemaRef referent can advance without spec changing.
167-
// Compare what spec.references currently resolves to.
168-
desired, err := r.resolveReferences(ctx, schema)
152+
var resolvedRefs []kafkaschemaregistry.ReferenceIn
153+
if len(schema.Spec.References) > 0 {
154+
refs, err := r.resolveReferences(ctx, schema)
169155
switch {
170156
case errors.Is(err, errPreconditionNotMet):
171157
// Referent exists but its Status.Version is still 0 — soft-requeue.
172158
return Observation{}, err
173159
case err != nil:
174-
return Observation{}, fmt.Errorf("resolving desired references: %w", err)
160+
return Observation{}, fmt.Errorf("resolving references: %w", err)
175161
}
176-
if !referencesEqual(desired, got.References) {
177-
return Observation{ResourceExists: true, ResourceUpToDate: false}, nil
178-
}
179-
180-
meta.SetStatusCondition(&schema.Status.Conditions,
181-
getRunningCondition(metav1.ConditionTrue, "CheckRunning", "Instance is running on Aiven side"))
182-
metav1.SetMetaDataAnnotation(&schema.ObjectMeta, instanceIsRunningAnnotation, "true")
162+
resolvedRefs = refs
163+
}
164+
desiredFP := fingerprintSchema(schema, resolvedRefs)
183165

184-
return Observation{
185-
ResourceExists: true,
186-
ResourceUpToDate: hasLatestGeneration(schema),
187-
}, nil
166+
appliedFP := schema.GetAnnotations()[kafkaSchemaAppliedFingerprintAnnotation]
167+
if appliedFP != desiredFP {
168+
return Observation{ResourceExists: true, ResourceUpToDate: false}, nil
188169
}
189170

190-
// Tracked version is not visible yet.
191-
return Observation{}, fmt.Errorf("%w: tracked schema ID %d not visible in registry", errPreconditionNotMet, schema.Status.ID)
171+
meta.SetStatusCondition(&schema.Status.Conditions,
172+
getRunningCondition(metav1.ConditionTrue, "CheckRunning", "Instance is running on Aiven side"))
173+
metav1.SetMetaDataAnnotation(&schema.ObjectMeta, instanceIsRunningAnnotation, "true")
174+
175+
return Observation{
176+
ResourceExists: true,
177+
ResourceUpToDate: hasLatestGeneration(schema),
178+
}, nil
192179
}
193180

194181
func (r *KafkaSchemaController) Create(ctx context.Context, schema *v1alpha1.KafkaSchema) (CreateResult, error) {
@@ -232,11 +219,13 @@ func (r *KafkaSchemaController) applySchema(ctx context.Context, schema *v1alpha
232219
SchemaType: schema.Spec.SchemaType,
233220
}
234221

222+
var resolvedRefs []kafkaschemaregistry.ReferenceIn
235223
if len(schema.Spec.References) > 0 {
236224
refs, err := r.resolveReferences(ctx, schema)
237225
if err != nil {
238226
return err
239227
}
228+
resolvedRefs = refs
240229
postIn.References = &refs
241230
}
242231

@@ -251,8 +240,12 @@ func (r *KafkaSchemaController) applySchema(ctx context.Context, schema *v1alpha
251240
return fmt.Errorf("cannot add Kafka Schema Subject: %w", err)
252241
}
253242

254-
// ID is used by Observe to look up the version, which may take some time to appear.
243+
version, err := r.lookupVersionForID(ctx, schema, schemaID)
244+
if err != nil {
245+
return fmt.Errorf("locating version for schema ID %d: %w", schemaID, err)
246+
}
255247
schema.Status.ID = schemaID
248+
schema.Status.Version = version
256249

257250
if schema.Spec.CompatibilityLevel != "" {
258251
if _, err := r.avnGen.ServiceSchemaRegistrySubjectConfigPut(
@@ -266,9 +259,94 @@ func (r *KafkaSchemaController) applySchema(ctx context.Context, schema *v1alpha
266259
}
267260
}
268261

262+
metav1.SetMetaDataAnnotation(
263+
&schema.ObjectMeta,
264+
kafkaSchemaAppliedFingerprintAnnotation,
265+
fingerprintSchema(schema, resolvedRefs),
266+
)
267+
268+
// TODO: workaround for the stale-status race in the managed reconciler.
269+
// Remove once updateStatus stops clobbering concurrently-written status fields.
270+
if err := r.persistAppliedStatus(ctx, schema); err != nil {
271+
return fmt.Errorf("persisting Status: %w", err)
272+
}
273+
269274
return nil
270275
}
271276

277+
// lookupVersionForID returns the registry version holding the given schema id.
278+
func (r *KafkaSchemaController) lookupVersionForID(
279+
ctx context.Context, schema *v1alpha1.KafkaSchema, id int,
280+
) (int, error) {
281+
versions, err := r.avnGen.ServiceSchemaRegistrySubjectVersionsGet(
282+
ctx,
283+
schema.Spec.Project,
284+
schema.Spec.ServiceName,
285+
schema.Spec.SubjectName,
286+
)
287+
if err != nil {
288+
return 0, fmt.Errorf("listing Kafka Schema versions: %w", err)
289+
}
290+
291+
sort.Slice(versions, func(i, j int) bool { return versions[i] > versions[j] })
292+
for _, v := range versions {
293+
got, err := r.avnGen.ServiceSchemaRegistrySubjectVersionGet(
294+
ctx,
295+
schema.Spec.Project,
296+
schema.Spec.ServiceName,
297+
schema.Spec.SubjectName,
298+
v,
299+
)
300+
if err != nil {
301+
return 0, fmt.Errorf("getting Kafka Schema version %d: %w", v, err)
302+
}
303+
if got.Id == id {
304+
return got.Version, nil
305+
}
306+
}
307+
308+
return 0, fmt.Errorf("%w: schema ID %d not visible in registry yet", errPreconditionNotMet, id)
309+
}
310+
311+
// persistAppliedStatus writes Status.ID and Status.Version in their own status-subresource update.
312+
// TODO: workaround for the stale-status race
313+
func (r *KafkaSchemaController) persistAppliedStatus(ctx context.Context, schema *v1alpha1.KafkaSchema) error {
314+
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
315+
latest := &v1alpha1.KafkaSchema{}
316+
if err := r.Get(ctx, client.ObjectKeyFromObject(schema), latest); err != nil {
317+
return err
318+
}
319+
latest.Status.ID = schema.Status.ID
320+
latest.Status.Version = schema.Status.Version
321+
if latest.Status.Conditions == nil {
322+
latest.Status.Conditions = []metav1.Condition{}
323+
}
324+
return r.Status().Update(ctx, latest)
325+
})
326+
}
327+
328+
// fingerprintSchema returns a stable hash of the provided schema.
329+
func fingerprintSchema(schema *v1alpha1.KafkaSchema, refs []kafkaschemaregistry.ReferenceIn) string {
330+
sorted := append([]kafkaschemaregistry.ReferenceIn(nil), refs...)
331+
sort.Slice(sorted, func(i, j int) bool { return sorted[i].Name < sorted[j].Name })
332+
333+
payload := struct {
334+
Schema string `json:"schema"`
335+
SchemaType kafkaschemaregistry.SchemaType `json:"schemaType"`
336+
CompatibilityLevel kafkaschemaregistry.CompatibilityType `json:"compatibilityLevel,omitempty"`
337+
References []kafkaschemaregistry.ReferenceIn `json:"references"`
338+
}{
339+
Schema: schema.Spec.Schema,
340+
SchemaType: schema.Spec.SchemaType,
341+
CompatibilityLevel: schema.Spec.CompatibilityLevel,
342+
References: sorted,
343+
}
344+
345+
buf, _ := json.Marshal(payload)
346+
sum := sha256.Sum256(buf)
347+
return hex.EncodeToString(sum[:])
348+
}
349+
272350
// resolveReferences turns Spec.References into the ReferenceIn slice.
273351
func (r *KafkaSchemaController) resolveReferences(
274352
ctx context.Context,
@@ -306,33 +384,6 @@ func (r *KafkaSchemaController) resolveReferences(
306384
return refs, nil
307385
}
308386

309-
// referencesEqual compares desired vs. registry references by name (path / $ref key).
310-
// Order is ignored. Names are enforced to be unique per KafkaSchema.
311-
func referencesEqual(desired []kafkaschemaregistry.ReferenceIn, got []kafkaschemaregistry.ReferenceOut) bool {
312-
if len(desired) != len(got) {
313-
return false
314-
}
315-
316-
byName := make(map[string]kafkaschemaregistry.ReferenceOut, len(got))
317-
for _, r := range got {
318-
byName[r.Name] = r
319-
}
320-
321-
// If the registry ever returned duplicate Names.
322-
if len(byName) != len(got) {
323-
return false
324-
}
325-
326-
for _, d := range desired {
327-
g, ok := byName[d.Name]
328-
if !ok || g.Subject != d.Subject || g.Version != d.Version {
329-
return false
330-
}
331-
}
332-
333-
return true
334-
}
335-
336387
func (r *KafkaSchemaController) Delete(ctx context.Context, schema *v1alpha1.KafkaSchema) error {
337388
// Block delete if any KafkaSchema in this namespace still imports us via kafkaSchemaRef.
338389
// Only catches kafkaSchemaRef dependents in the same namespace.

0 commit comments

Comments
 (0)