Skip to content

Commit 56bc51c

Browse files
authored
chore(kafka_schema): add fingerprint annotation (#1187)
1 parent e44e6f7 commit 56bc51c

3 files changed

Lines changed: 260 additions & 178 deletions

File tree

controllers/kafkaschema_controller.go

Lines changed: 98 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ package controllers
44

55
import (
66
"context"
7+
"crypto/sha256"
8+
"encoding/hex"
9+
"encoding/json"
710
"errors"
811
"fmt"
912
"sort"
@@ -25,6 +28,10 @@ import (
2528
"github.com/aiven/aiven-operator/api/v1alpha1"
2629
)
2730

31+
// kafkaSchemaAppliedFingerprintAnnotation stores a hash of the last
32+
// schema body + resolved references + compatibility level.
33+
const kafkaSchemaAppliedFingerprintAnnotation = "controllers.aiven.io/kafka-schema-applied"
34+
2835
// kafkaSchemaRefIndex is the cache index key for finding KafkaSchemas that
2936
// reference another KafkaSchema by name.
3037
const kafkaSchemaRefIndex = "spec.references.kafkaSchemaRef.name"
@@ -118,12 +125,14 @@ func kafkaSchemaVersionChangedPredicate() predicate.Predicate {
118125
}
119126
}
120127

128+
// Observe decides whether the registry already serves what the spec describes.
129+
// Drift detection is driven by an annotation fingerprint of the last applied schema.
121130
func (r *KafkaSchemaController) Observe(ctx context.Context, schema *v1alpha1.KafkaSchema) (Observation, error) {
122131
if _, err := getServiceIfOperational(ctx, r.avnGen, schema.Spec.Project, schema.Spec.ServiceName); err != nil {
123132
return Observation{}, err
124133
}
125134

126-
versions, err := r.avnGen.ServiceSchemaRegistrySubjectVersionsGet(
135+
_, err := r.avnGen.ServiceSchemaRegistrySubjectVersionsGet(
127136
ctx,
128137
schema.Spec.Project,
129138
schema.Spec.ServiceName,
@@ -134,61 +143,39 @@ func (r *KafkaSchemaController) Observe(ctx context.Context, schema *v1alpha1.Ka
134143
// The service is operational but the schema registry may not yet be ready.
135144
return Observation{}, fmt.Errorf("%w: schema registry not ready", errPreconditionNotMet)
136145
case isNotFound(err):
137-
// Subject is not registered yet
146+
// Subject is not registered yet.
138147
return Observation{ResourceExists: false}, nil
139148
case err != nil:
140149
return Observation{}, fmt.Errorf("listing Kafka Schema versions: %w", err)
141150
}
142151

143-
if schema.Status.ID == 0 {
144-
// No ID tracked yet, fall through to Create; it is idempotent.
145-
return Observation{ResourceExists: false}, nil
146-
}
147-
148-
for _, v := range versions {
149-
got, err := r.avnGen.ServiceSchemaRegistrySubjectVersionGet(
150-
ctx,
151-
schema.Spec.Project,
152-
schema.Spec.ServiceName,
153-
schema.Spec.SubjectName,
154-
v,
155-
)
156-
if err != nil {
157-
return Observation{}, fmt.Errorf("getting Kafka Schema version %d: %w", v, err)
158-
}
159-
160-
if got.Id != schema.Status.ID {
161-
continue
162-
}
163-
164-
schema.Status.Version = got.Version
165-
166-
// A kafkaSchemaRef referent can advance without spec changing.
167-
// Compare what spec.references currently resolves to.
168-
desired, err := r.resolveReferences(ctx, schema)
152+
var resolvedRefs []kafkaschemaregistry.ReferenceIn
153+
if len(schema.Spec.References) > 0 {
154+
refs, err := r.resolveReferences(ctx, schema)
169155
switch {
170156
case errors.Is(err, errPreconditionNotMet):
171157
// Referent exists but its Status.Version is still 0 — soft-requeue.
172158
return Observation{}, err
173159
case err != nil:
174-
return Observation{}, fmt.Errorf("resolving desired references: %w", err)
175-
}
176-
if !referencesEqual(desired, got.References) {
177-
return Observation{ResourceExists: true, ResourceUpToDate: false}, nil
160+
return Observation{}, fmt.Errorf("resolving references: %w", err)
178161
}
162+
resolvedRefs = refs
163+
}
164+
desiredFP := fingerprintSchema(schema, resolvedRefs)
179165

180-
meta.SetStatusCondition(&schema.Status.Conditions,
181-
getRunningCondition(metav1.ConditionTrue, "CheckRunning", "Instance is running on Aiven side"))
182-
metav1.SetMetaDataAnnotation(&schema.ObjectMeta, instanceIsRunningAnnotation, "true")
183-
184-
return Observation{
185-
ResourceExists: true,
186-
ResourceUpToDate: hasLatestGeneration(schema),
187-
}, nil
166+
appliedFP, ok := schema.GetAnnotations()[kafkaSchemaAppliedFingerprintAnnotation]
167+
if !ok || appliedFP != desiredFP {
168+
return Observation{ResourceExists: true, ResourceUpToDate: false}, nil
188169
}
189170

190-
// Tracked version is not visible yet.
191-
return Observation{}, fmt.Errorf("%w: tracked schema ID %d not visible in registry", errPreconditionNotMet, schema.Status.ID)
171+
meta.SetStatusCondition(&schema.Status.Conditions,
172+
getRunningCondition(metav1.ConditionTrue, "CheckRunning", "Instance is running on Aiven side"))
173+
metav1.SetMetaDataAnnotation(&schema.ObjectMeta, instanceIsRunningAnnotation, "true")
174+
175+
return Observation{
176+
ResourceExists: true,
177+
ResourceUpToDate: hasLatestGeneration(schema),
178+
}, nil
192179
}
193180

194181
func (r *KafkaSchemaController) Create(ctx context.Context, schema *v1alpha1.KafkaSchema) (CreateResult, error) {
@@ -232,11 +219,13 @@ func (r *KafkaSchemaController) applySchema(ctx context.Context, schema *v1alpha
232219
SchemaType: schema.Spec.SchemaType,
233220
}
234221

222+
var resolvedRefs []kafkaschemaregistry.ReferenceIn
235223
if len(schema.Spec.References) > 0 {
236224
refs, err := r.resolveReferences(ctx, schema)
237225
if err != nil {
238226
return err
239227
}
228+
resolvedRefs = refs
240229
postIn.References = &refs
241230
}
242231

@@ -251,8 +240,12 @@ func (r *KafkaSchemaController) applySchema(ctx context.Context, schema *v1alpha
251240
return fmt.Errorf("cannot add Kafka Schema Subject: %w", err)
252241
}
253242

254-
// ID is used by Observe to look up the version, which may take some time to appear.
243+
version, err := r.lookupVersionForID(ctx, schema, schemaID)
244+
if err != nil {
245+
return fmt.Errorf("locating version for schema ID %d: %w", schemaID, err)
246+
}
255247
schema.Status.ID = schemaID
248+
schema.Status.Version = version
256249

257250
if schema.Spec.CompatibilityLevel != "" {
258251
if _, err := r.avnGen.ServiceSchemaRegistrySubjectConfigPut(
@@ -266,9 +259,71 @@ func (r *KafkaSchemaController) applySchema(ctx context.Context, schema *v1alpha
266259
}
267260
}
268261

262+
metav1.SetMetaDataAnnotation(
263+
&schema.ObjectMeta,
264+
kafkaSchemaAppliedFingerprintAnnotation,
265+
fingerprintSchema(schema, resolvedRefs),
266+
)
267+
269268
return nil
270269
}
271270

271+
// lookupVersionForID returns the registry version holding the given schema id.
272+
func (r *KafkaSchemaController) lookupVersionForID(
273+
ctx context.Context, schema *v1alpha1.KafkaSchema, id int,
274+
) (int, error) {
275+
versions, err := r.avnGen.ServiceSchemaRegistrySubjectVersionsGet(
276+
ctx,
277+
schema.Spec.Project,
278+
schema.Spec.ServiceName,
279+
schema.Spec.SubjectName,
280+
)
281+
if err != nil {
282+
return 0, fmt.Errorf("listing Kafka Schema versions: %w", err)
283+
}
284+
285+
sort.Slice(versions, func(i, j int) bool { return versions[i] > versions[j] })
286+
for _, v := range versions {
287+
got, err := r.avnGen.ServiceSchemaRegistrySubjectVersionGet(
288+
ctx,
289+
schema.Spec.Project,
290+
schema.Spec.ServiceName,
291+
schema.Spec.SubjectName,
292+
v,
293+
)
294+
if err != nil {
295+
return 0, fmt.Errorf("getting Kafka Schema version %d: %w", v, err)
296+
}
297+
if got.Id == id {
298+
return got.Version, nil
299+
}
300+
}
301+
302+
return 0, fmt.Errorf("%w: schema ID %d not visible in registry yet", errPreconditionNotMet, id)
303+
}
304+
305+
// fingerprintSchema returns a stable hash of the provided schema.
306+
func fingerprintSchema(schema *v1alpha1.KafkaSchema, refs []kafkaschemaregistry.ReferenceIn) string {
307+
sorted := append([]kafkaschemaregistry.ReferenceIn(nil), refs...)
308+
sort.Slice(sorted, func(i, j int) bool { return sorted[i].Name < sorted[j].Name })
309+
310+
payload := struct {
311+
Schema string `json:"schema"`
312+
SchemaType kafkaschemaregistry.SchemaType `json:"schemaType"`
313+
CompatibilityLevel kafkaschemaregistry.CompatibilityType `json:"compatibilityLevel,omitempty"`
314+
References []kafkaschemaregistry.ReferenceIn `json:"references"`
315+
}{
316+
Schema: schema.Spec.Schema,
317+
SchemaType: schema.Spec.SchemaType,
318+
CompatibilityLevel: schema.Spec.CompatibilityLevel,
319+
References: sorted,
320+
}
321+
322+
buf, _ := json.Marshal(payload)
323+
sum := sha256.Sum256(buf)
324+
return hex.EncodeToString(sum[:])
325+
}
326+
272327
// resolveReferences turns Spec.References into the ReferenceIn slice.
273328
func (r *KafkaSchemaController) resolveReferences(
274329
ctx context.Context,
@@ -306,33 +361,6 @@ func (r *KafkaSchemaController) resolveReferences(
306361
return refs, nil
307362
}
308363

309-
// referencesEqual compares desired vs. registry references by name (path / $ref key).
310-
// Order is ignored. Names are enforced to be unique per KafkaSchema.
311-
func referencesEqual(desired []kafkaschemaregistry.ReferenceIn, got []kafkaschemaregistry.ReferenceOut) bool {
312-
if len(desired) != len(got) {
313-
return false
314-
}
315-
316-
byName := make(map[string]kafkaschemaregistry.ReferenceOut, len(got))
317-
for _, r := range got {
318-
byName[r.Name] = r
319-
}
320-
321-
// If the registry ever returned duplicate Names.
322-
if len(byName) != len(got) {
323-
return false
324-
}
325-
326-
for _, d := range desired {
327-
g, ok := byName[d.Name]
328-
if !ok || g.Subject != d.Subject || g.Version != d.Version {
329-
return false
330-
}
331-
}
332-
333-
return true
334-
}
335-
336364
func (r *KafkaSchemaController) Delete(ctx context.Context, schema *v1alpha1.KafkaSchema) error {
337365
// Block delete if any KafkaSchema in this namespace still imports us via kafkaSchemaRef.
338366
// Only catches kafkaSchemaRef dependents in the same namespace.

0 commit comments

Comments
 (0)