Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 98 additions & 70 deletions controllers/kafkaschema_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ package controllers

import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"sort"
Expand All @@ -25,6 +28,10 @@ import (
"github.com/aiven/aiven-operator/api/v1alpha1"
)

// kafkaSchemaAppliedFingerprintAnnotation stores a hash of the last
// schema body + resolved references + compatibility level.
const kafkaSchemaAppliedFingerprintAnnotation = "controllers.aiven.io/kafka-schema-applied"

// kafkaSchemaRefIndex is the cache index key for finding KafkaSchemas that
// reference another KafkaSchema by name.
const kafkaSchemaRefIndex = "spec.references.kafkaSchemaRef.name"
Expand Down Expand Up @@ -118,12 +125,14 @@ func kafkaSchemaVersionChangedPredicate() predicate.Predicate {
}
}

// Observe decides whether the registry already serves what the spec describes.
// Drift detection is driven by an annotation fingerprint of the last applied schema.
func (r *KafkaSchemaController) Observe(ctx context.Context, schema *v1alpha1.KafkaSchema) (Observation, error) {
if _, err := getServiceIfOperational(ctx, r.avnGen, schema.Spec.Project, schema.Spec.ServiceName); err != nil {
return Observation{}, err
}

versions, err := r.avnGen.ServiceSchemaRegistrySubjectVersionsGet(
_, err := r.avnGen.ServiceSchemaRegistrySubjectVersionsGet(
ctx,
schema.Spec.Project,
schema.Spec.ServiceName,
Expand All @@ -134,61 +143,39 @@ func (r *KafkaSchemaController) Observe(ctx context.Context, schema *v1alpha1.Ka
// The service is operational but the schema registry may not yet be ready.
return Observation{}, fmt.Errorf("%w: schema registry not ready", errPreconditionNotMet)
case isNotFound(err):
// Subject is not registered yet
// Subject is not registered yet.
return Observation{ResourceExists: false}, nil
case err != nil:
return Observation{}, fmt.Errorf("listing Kafka Schema versions: %w", err)
}

if schema.Status.ID == 0 {
// No ID tracked yet, fall through to Create; it is idempotent.
return Observation{ResourceExists: false}, nil
}

for _, v := range versions {
got, err := r.avnGen.ServiceSchemaRegistrySubjectVersionGet(
ctx,
schema.Spec.Project,
schema.Spec.ServiceName,
schema.Spec.SubjectName,
v,
)
if err != nil {
return Observation{}, fmt.Errorf("getting Kafka Schema version %d: %w", v, err)
}

if got.Id != schema.Status.ID {
continue
}

schema.Status.Version = got.Version

// A kafkaSchemaRef referent can advance without spec changing.
// Compare what spec.references currently resolves to.
desired, err := r.resolveReferences(ctx, schema)
var resolvedRefs []kafkaschemaregistry.ReferenceIn
if len(schema.Spec.References) > 0 {
refs, err := r.resolveReferences(ctx, schema)
switch {
case errors.Is(err, errPreconditionNotMet):
// Referent exists but its Status.Version is still 0 — soft-requeue.
return Observation{}, err
case err != nil:
return Observation{}, fmt.Errorf("resolving desired references: %w", err)
}
if !referencesEqual(desired, got.References) {
return Observation{ResourceExists: true, ResourceUpToDate: false}, nil
return Observation{}, fmt.Errorf("resolving references: %w", err)
}
resolvedRefs = refs
}
desiredFP := fingerprintSchema(schema, resolvedRefs)

meta.SetStatusCondition(&schema.Status.Conditions,
getRunningCondition(metav1.ConditionTrue, "CheckRunning", "Instance is running on Aiven side"))
metav1.SetMetaDataAnnotation(&schema.ObjectMeta, instanceIsRunningAnnotation, "true")

return Observation{
ResourceExists: true,
ResourceUpToDate: hasLatestGeneration(schema),
}, nil
appliedFP, ok := schema.GetAnnotations()[kafkaSchemaAppliedFingerprintAnnotation]
if !ok || appliedFP != desiredFP {
return Observation{ResourceExists: true, ResourceUpToDate: false}, nil
}

// Tracked version is not visible yet.
return Observation{}, fmt.Errorf("%w: tracked schema ID %d not visible in registry", errPreconditionNotMet, schema.Status.ID)
meta.SetStatusCondition(&schema.Status.Conditions,
getRunningCondition(metav1.ConditionTrue, "CheckRunning", "Instance is running on Aiven side"))
metav1.SetMetaDataAnnotation(&schema.ObjectMeta, instanceIsRunningAnnotation, "true")

return Observation{
ResourceExists: true,
ResourceUpToDate: hasLatestGeneration(schema),
}, nil
}

func (r *KafkaSchemaController) Create(ctx context.Context, schema *v1alpha1.KafkaSchema) (CreateResult, error) {
Expand Down Expand Up @@ -232,11 +219,13 @@ func (r *KafkaSchemaController) applySchema(ctx context.Context, schema *v1alpha
SchemaType: schema.Spec.SchemaType,
}

var resolvedRefs []kafkaschemaregistry.ReferenceIn
if len(schema.Spec.References) > 0 {
refs, err := r.resolveReferences(ctx, schema)
if err != nil {
return err
}
resolvedRefs = refs
postIn.References = &refs
}

Expand All @@ -251,8 +240,12 @@ func (r *KafkaSchemaController) applySchema(ctx context.Context, schema *v1alpha
return fmt.Errorf("cannot add Kafka Schema Subject: %w", err)
}

// ID is used by Observe to look up the version, which may take some time to appear.
version, err := r.lookupVersionForID(ctx, schema, schemaID)
if err != nil {
return fmt.Errorf("locating version for schema ID %d: %w", schemaID, err)
}
schema.Status.ID = schemaID
schema.Status.Version = version

if schema.Spec.CompatibilityLevel != "" {
if _, err := r.avnGen.ServiceSchemaRegistrySubjectConfigPut(
Expand All @@ -266,9 +259,71 @@ func (r *KafkaSchemaController) applySchema(ctx context.Context, schema *v1alpha
}
}

metav1.SetMetaDataAnnotation(
&schema.ObjectMeta,
kafkaSchemaAppliedFingerprintAnnotation,
fingerprintSchema(schema, resolvedRefs),
)

return nil
}

// lookupVersionForID returns the registry version holding the given schema id.
func (r *KafkaSchemaController) lookupVersionForID(
ctx context.Context, schema *v1alpha1.KafkaSchema, id int,
) (int, error) {
versions, err := r.avnGen.ServiceSchemaRegistrySubjectVersionsGet(
ctx,
schema.Spec.Project,
schema.Spec.ServiceName,
schema.Spec.SubjectName,
)
if err != nil {
return 0, fmt.Errorf("listing Kafka Schema versions: %w", err)
}

sort.Slice(versions, func(i, j int) bool { return versions[i] > versions[j] })
for _, v := range versions {
got, err := r.avnGen.ServiceSchemaRegistrySubjectVersionGet(
ctx,
schema.Spec.Project,
schema.Spec.ServiceName,
schema.Spec.SubjectName,
v,
)
if err != nil {
return 0, fmt.Errorf("getting Kafka Schema version %d: %w", v, err)
}
if got.Id == id {
return got.Version, nil
}
}

return 0, fmt.Errorf("%w: schema ID %d not visible in registry yet", errPreconditionNotMet, id)
}

// fingerprintSchema returns a stable hash of the provided schema.
func fingerprintSchema(schema *v1alpha1.KafkaSchema, refs []kafkaschemaregistry.ReferenceIn) string {
sorted := append([]kafkaschemaregistry.ReferenceIn(nil), refs...)
sort.Slice(sorted, func(i, j int) bool { return sorted[i].Name < sorted[j].Name })

payload := struct {
Schema string `json:"schema"`
SchemaType kafkaschemaregistry.SchemaType `json:"schemaType"`
CompatibilityLevel kafkaschemaregistry.CompatibilityType `json:"compatibilityLevel,omitempty"`
References []kafkaschemaregistry.ReferenceIn `json:"references"`
}{
Schema: schema.Spec.Schema,
SchemaType: schema.Spec.SchemaType,
CompatibilityLevel: schema.Spec.CompatibilityLevel,
References: sorted,
}

buf, _ := json.Marshal(payload)
sum := sha256.Sum256(buf)
return hex.EncodeToString(sum[:])
}

// resolveReferences turns Spec.References into the ReferenceIn slice.
func (r *KafkaSchemaController) resolveReferences(
ctx context.Context,
Expand Down Expand Up @@ -306,33 +361,6 @@ func (r *KafkaSchemaController) resolveReferences(
return refs, nil
}

// referencesEqual compares desired vs. registry references by name (path / $ref key).
// Order is ignored. Names are enforced to be unique per KafkaSchema.
func referencesEqual(desired []kafkaschemaregistry.ReferenceIn, got []kafkaschemaregistry.ReferenceOut) bool {
if len(desired) != len(got) {
return false
}

byName := make(map[string]kafkaschemaregistry.ReferenceOut, len(got))
for _, r := range got {
byName[r.Name] = r
}

// If the registry ever returned duplicate Names.
if len(byName) != len(got) {
return false
}

for _, d := range desired {
g, ok := byName[d.Name]
if !ok || g.Subject != d.Subject || g.Version != d.Version {
return false
}
}

return true
}

func (r *KafkaSchemaController) Delete(ctx context.Context, schema *v1alpha1.KafkaSchema) error {
// Block delete if any KafkaSchema in this namespace still imports us via kafkaSchemaRef.
// Only catches kafkaSchemaRef dependents in the same namespace.
Expand Down
Loading
Loading