From aaade2461167de5475ff9920db44426c1a51d243 Mon Sep 17 00:00:00 2001 From: Myroslav Vivcharyk Date: Wed, 27 May 2026 16:46:21 +0200 Subject: [PATCH] chore(kafka_schema): remove status from control flow --- controllers/kafkaschema_controller.go | 168 +++++++------ controllers/kafkaschema_controller_test.go | 264 ++++++++++++--------- controllers/reconciler.go | 6 +- 3 files changed, 260 insertions(+), 178 deletions(-) diff --git a/controllers/kafkaschema_controller.go b/controllers/kafkaschema_controller.go index b35913bf..aa09e164 100644 --- a/controllers/kafkaschema_controller.go +++ b/controllers/kafkaschema_controller.go @@ -4,6 +4,9 @@ package controllers import ( "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" "errors" "fmt" "sort" @@ -25,6 +28,10 @@ import ( "github.com/aiven/aiven-operator/api/v1alpha1" ) +// kafkaSchemaAppliedFingerprintAnnotation stores a hash of the last +// schema body + resolved references + compatibility level. +const kafkaSchemaAppliedFingerprintAnnotation = "controllers.aiven.io/kafka-schema-applied" + // kafkaSchemaRefIndex is the cache index key for finding KafkaSchemas that // reference another KafkaSchema by name. const kafkaSchemaRefIndex = "spec.references.kafkaSchemaRef.name" @@ -118,12 +125,14 @@ func kafkaSchemaVersionChangedPredicate() predicate.Predicate { } } +// Observe decides whether the registry already serves what the spec describes. +// Drift detection is driven by an annotation fingerprint of the last applied schema. func (r *KafkaSchemaController) Observe(ctx context.Context, schema *v1alpha1.KafkaSchema) (Observation, error) { if _, err := getServiceIfOperational(ctx, r.avnGen, schema.Spec.Project, schema.Spec.ServiceName); err != nil { return Observation{}, err } - versions, err := r.avnGen.ServiceSchemaRegistrySubjectVersionsGet( + _, err := r.avnGen.ServiceSchemaRegistrySubjectVersionsGet( ctx, schema.Spec.Project, schema.Spec.ServiceName, @@ -134,61 +143,39 @@ func (r *KafkaSchemaController) Observe(ctx context.Context, schema *v1alpha1.Ka // The service is operational but the schema registry may not yet be ready. return Observation{}, fmt.Errorf("%w: schema registry not ready", errPreconditionNotMet) case isNotFound(err): - // Subject is not registered yet + // Subject is not registered yet. return Observation{ResourceExists: false}, nil case err != nil: return Observation{}, fmt.Errorf("listing Kafka Schema versions: %w", err) } - if schema.Status.ID == 0 { - // No ID tracked yet, fall through to Create; it is idempotent. - return Observation{ResourceExists: false}, nil - } - - for _, v := range versions { - got, err := r.avnGen.ServiceSchemaRegistrySubjectVersionGet( - ctx, - schema.Spec.Project, - schema.Spec.ServiceName, - schema.Spec.SubjectName, - v, - ) - if err != nil { - return Observation{}, fmt.Errorf("getting Kafka Schema version %d: %w", v, err) - } - - if got.Id != schema.Status.ID { - continue - } - - schema.Status.Version = got.Version - - // A kafkaSchemaRef referent can advance without spec changing. - // Compare what spec.references currently resolves to. - desired, err := r.resolveReferences(ctx, schema) + var resolvedRefs []kafkaschemaregistry.ReferenceIn + if len(schema.Spec.References) > 0 { + refs, err := r.resolveReferences(ctx, schema) switch { case errors.Is(err, errPreconditionNotMet): // Referent exists but its Status.Version is still 0 — soft-requeue. return Observation{}, err case err != nil: - return Observation{}, fmt.Errorf("resolving desired references: %w", err) - } - if !referencesEqual(desired, got.References) { - return Observation{ResourceExists: true, ResourceUpToDate: false}, nil + return Observation{}, fmt.Errorf("resolving references: %w", err) } + resolvedRefs = refs + } + desiredFP := fingerprintSchema(schema, resolvedRefs) - meta.SetStatusCondition(&schema.Status.Conditions, - getRunningCondition(metav1.ConditionTrue, "CheckRunning", "Instance is running on Aiven side")) - metav1.SetMetaDataAnnotation(&schema.ObjectMeta, instanceIsRunningAnnotation, "true") - - return Observation{ - ResourceExists: true, - ResourceUpToDate: hasLatestGeneration(schema), - }, nil + appliedFP, ok := schema.GetAnnotations()[kafkaSchemaAppliedFingerprintAnnotation] + if !ok || appliedFP != desiredFP { + return Observation{ResourceExists: true, ResourceUpToDate: false}, nil } - // Tracked version is not visible yet. - return Observation{}, fmt.Errorf("%w: tracked schema ID %d not visible in registry", errPreconditionNotMet, schema.Status.ID) + meta.SetStatusCondition(&schema.Status.Conditions, + getRunningCondition(metav1.ConditionTrue, "CheckRunning", "Instance is running on Aiven side")) + metav1.SetMetaDataAnnotation(&schema.ObjectMeta, instanceIsRunningAnnotation, "true") + + return Observation{ + ResourceExists: true, + ResourceUpToDate: hasLatestGeneration(schema), + }, nil } func (r *KafkaSchemaController) Create(ctx context.Context, schema *v1alpha1.KafkaSchema) (CreateResult, error) { @@ -232,11 +219,13 @@ func (r *KafkaSchemaController) applySchema(ctx context.Context, schema *v1alpha SchemaType: schema.Spec.SchemaType, } + var resolvedRefs []kafkaschemaregistry.ReferenceIn if len(schema.Spec.References) > 0 { refs, err := r.resolveReferences(ctx, schema) if err != nil { return err } + resolvedRefs = refs postIn.References = &refs } @@ -251,8 +240,12 @@ func (r *KafkaSchemaController) applySchema(ctx context.Context, schema *v1alpha return fmt.Errorf("cannot add Kafka Schema Subject: %w", err) } - // ID is used by Observe to look up the version, which may take some time to appear. + version, err := r.lookupVersionForID(ctx, schema, schemaID) + if err != nil { + return fmt.Errorf("locating version for schema ID %d: %w", schemaID, err) + } schema.Status.ID = schemaID + schema.Status.Version = version if schema.Spec.CompatibilityLevel != "" { if _, err := r.avnGen.ServiceSchemaRegistrySubjectConfigPut( @@ -266,9 +259,71 @@ func (r *KafkaSchemaController) applySchema(ctx context.Context, schema *v1alpha } } + metav1.SetMetaDataAnnotation( + &schema.ObjectMeta, + kafkaSchemaAppliedFingerprintAnnotation, + fingerprintSchema(schema, resolvedRefs), + ) + return nil } +// lookupVersionForID returns the registry version holding the given schema id. +func (r *KafkaSchemaController) lookupVersionForID( + ctx context.Context, schema *v1alpha1.KafkaSchema, id int, +) (int, error) { + versions, err := r.avnGen.ServiceSchemaRegistrySubjectVersionsGet( + ctx, + schema.Spec.Project, + schema.Spec.ServiceName, + schema.Spec.SubjectName, + ) + if err != nil { + return 0, fmt.Errorf("listing Kafka Schema versions: %w", err) + } + + sort.Slice(versions, func(i, j int) bool { return versions[i] > versions[j] }) + for _, v := range versions { + got, err := r.avnGen.ServiceSchemaRegistrySubjectVersionGet( + ctx, + schema.Spec.Project, + schema.Spec.ServiceName, + schema.Spec.SubjectName, + v, + ) + if err != nil { + return 0, fmt.Errorf("getting Kafka Schema version %d: %w", v, err) + } + if got.Id == id { + return got.Version, nil + } + } + + return 0, fmt.Errorf("%w: schema ID %d not visible in registry yet", errPreconditionNotMet, id) +} + +// fingerprintSchema returns a stable hash of the provided schema. +func fingerprintSchema(schema *v1alpha1.KafkaSchema, refs []kafkaschemaregistry.ReferenceIn) string { + sorted := append([]kafkaschemaregistry.ReferenceIn(nil), refs...) + sort.Slice(sorted, func(i, j int) bool { return sorted[i].Name < sorted[j].Name }) + + payload := struct { + Schema string `json:"schema"` + SchemaType kafkaschemaregistry.SchemaType `json:"schemaType"` + CompatibilityLevel kafkaschemaregistry.CompatibilityType `json:"compatibilityLevel,omitempty"` + References []kafkaschemaregistry.ReferenceIn `json:"references"` + }{ + Schema: schema.Spec.Schema, + SchemaType: schema.Spec.SchemaType, + CompatibilityLevel: schema.Spec.CompatibilityLevel, + References: sorted, + } + + buf, _ := json.Marshal(payload) + sum := sha256.Sum256(buf) + return hex.EncodeToString(sum[:]) +} + // resolveReferences turns Spec.References into the ReferenceIn slice. func (r *KafkaSchemaController) resolveReferences( ctx context.Context, @@ -306,33 +361,6 @@ func (r *KafkaSchemaController) resolveReferences( return refs, nil } -// referencesEqual compares desired vs. registry references by name (path / $ref key). -// Order is ignored. Names are enforced to be unique per KafkaSchema. -func referencesEqual(desired []kafkaschemaregistry.ReferenceIn, got []kafkaschemaregistry.ReferenceOut) bool { - if len(desired) != len(got) { - return false - } - - byName := make(map[string]kafkaschemaregistry.ReferenceOut, len(got)) - for _, r := range got { - byName[r.Name] = r - } - - // If the registry ever returned duplicate Names. - if len(byName) != len(got) { - return false - } - - for _, d := range desired { - g, ok := byName[d.Name] - if !ok || g.Subject != d.Subject || g.Version != d.Version { - return false - } - } - - return true -} - func (r *KafkaSchemaController) Delete(ctx context.Context, schema *v1alpha1.KafkaSchema) error { // Block delete if any KafkaSchema in this namespace still imports us via kafkaSchemaRef. // Only catches kafkaSchemaRef dependents in the same namespace. diff --git a/controllers/kafkaschema_controller_test.go b/controllers/kafkaschema_controller_test.go index cd11331e..85ef858d 100644 --- a/controllers/kafkaschema_controller_test.go +++ b/controllers/kafkaschema_controller_test.go @@ -102,6 +102,14 @@ func TestKafkaSchemaReconciler(t *testing.T) { in.References == nil }), ).Return(42, nil).Once() + // applySchema looks up the version that holds the freshly-written id + // so Status.Version reflects what THIS CR registered. + avn.EXPECT(). + ServiceSchemaRegistrySubjectVersionsGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName). + Return([]int{1}, nil).Once() + avn.EXPECT(). + ServiceSchemaRegistrySubjectVersionGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, 1). + Return(&kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionGetOut{Id: 42, Version: 1}, nil).Once() r, res, err := runKafkaSchemaScenario(t, schema, avn) require.NoError(t, err) @@ -112,6 +120,9 @@ func TestKafkaSchemaReconciler(t *testing.T) { require.Equal(t, "1", got.Annotations[processedGenerationAnnotation]) require.NotContains(t, got.Annotations, instanceIsRunningAnnotation) require.Equal(t, 42, got.Status.ID) + require.Equal(t, 1, got.Status.Version, "Status.Version must be the version that holds the freshly-written id") + require.Equal(t, fingerprintSchema(schema, nil), got.Annotations[kafkaSchemaAppliedFingerprintAnnotation], + "applySchema must record the applied fingerprint so the next Observe sees no drift") }) t.Run("Creates KafkaSchema with compatibility level", func(t *testing.T) { @@ -130,6 +141,12 @@ func TestKafkaSchemaReconciler(t *testing.T) { ServiceSchemaRegistrySubjectVersionPost( mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, mock.Anything, ).Return(7, nil).Once() + avn.EXPECT(). + ServiceSchemaRegistrySubjectVersionsGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName). + Return([]int{1}, nil).Once() + avn.EXPECT(). + ServiceSchemaRegistrySubjectVersionGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, 1). + Return(&kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionGetOut{Id: 7, Version: 1}, nil).Once() avn.EXPECT(). ServiceSchemaRegistrySubjectConfigPut( mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, @@ -173,6 +190,12 @@ func TestKafkaSchemaReconciler(t *testing.T) { return ref.Name == "common.proto" && ref.Subject == "common-subject" && ref.Version == 1 }), ).Return(11, nil).Once() + avn.EXPECT(). + ServiceSchemaRegistrySubjectVersionsGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName). + Return([]int{1}, nil).Once() + avn.EXPECT(). + ServiceSchemaRegistrySubjectVersionGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, 1). + Return(&kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionGetOut{Id: 11, Version: 1}, nil).Once() r, res, err := runKafkaSchemaScenario(t, schema, avn) require.NoError(t, err) @@ -183,11 +206,15 @@ func TestKafkaSchemaReconciler(t *testing.T) { require.Equal(t, 11, got.Status.ID) }) - t.Run("Marks KafkaSchema running when tracked ID is visible", func(t *testing.T) { + t.Run("Marks KafkaSchema running when applied fingerprint matches the spec", func(t *testing.T) { schema := newObjectFromYAML[v1alpha1.KafkaSchema](t, yamlKafkaSchema) schema.Generation = 1 - schema.Annotations = map[string]string{processedGenerationAnnotation: "1"} + schema.Annotations = map[string]string{ + processedGenerationAnnotation: "1", + kafkaSchemaAppliedFingerprintAnnotation: fingerprintSchema(schema, nil), + } schema.Status.ID = 42 + schema.Status.Version = 2 avn := avngen.NewMockClient(t) avn.EXPECT(). @@ -195,10 +222,7 @@ func TestKafkaSchemaReconciler(t *testing.T) { Return(runningService(), nil).Once() avn.EXPECT(). ServiceSchemaRegistrySubjectVersionsGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName). - Return([]int{1}, nil).Once() - avn.EXPECT(). - ServiceSchemaRegistrySubjectVersionGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, 1). - Return(&kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionGetOut{Id: 42, Version: 1}, nil).Once() + Return([]int{1, 3, 2}, nil).Once() r, res, err := runKafkaSchemaScenario(t, schema, avn) require.NoError(t, err) @@ -206,16 +230,20 @@ func TestKafkaSchemaReconciler(t *testing.T) { got := &v1alpha1.KafkaSchema{} require.NoError(t, r.Get(t.Context(), types.NamespacedName{Name: schema.Name, Namespace: schema.Namespace}, got)) - require.Equal(t, 1, got.Status.Version) require.Equal(t, "true", got.Annotations[instanceIsRunningAnnotation]) + require.Equal(t, 2, got.Status.Version, + "Observe must not clobber Status.Version with the registry's max") }) - t.Run("Updates KafkaSchema when generation changes", func(t *testing.T) { + t.Run("Updates KafkaSchema when fingerprint disagrees with spec", func(t *testing.T) { schema := newObjectFromYAML[v1alpha1.KafkaSchema](t, yamlKafkaSchema) schema.Generation = 2 + // Stale fingerprint pinned to a previous spec body — drift is detected + // independently of generation. schema.Annotations = map[string]string{ - processedGenerationAnnotation: "1", - instanceIsRunningAnnotation: "true", + processedGenerationAnnotation: "1", + instanceIsRunningAnnotation: "true", + kafkaSchemaAppliedFingerprintAnnotation: "stale-fingerprint", } schema.Status.ID = 42 schema.Status.Version = 1 @@ -227,13 +255,16 @@ func TestKafkaSchemaReconciler(t *testing.T) { avn.EXPECT(). ServiceSchemaRegistrySubjectVersionsGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName). Return([]int{1}, nil).Once() - avn.EXPECT(). - ServiceSchemaRegistrySubjectVersionGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, 1). - Return(&kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionGetOut{Id: 42, Version: 1}, nil).Once() avn.EXPECT(). ServiceSchemaRegistrySubjectVersionPost( mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, mock.Anything, ).Return(99, nil).Once() + avn.EXPECT(). + ServiceSchemaRegistrySubjectVersionsGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName). + Return([]int{1, 2}, nil).Once() + avn.EXPECT(). + ServiceSchemaRegistrySubjectVersionGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, 2). + Return(&kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionGetOut{Id: 99, Version: 2}, nil).Once() r, res, err := runKafkaSchemaScenario(t, schema, avn) require.NoError(t, err) @@ -244,12 +275,16 @@ func TestKafkaSchemaReconciler(t *testing.T) { require.Equal(t, "2", got.Annotations[processedGenerationAnnotation]) require.NotContains(t, got.Annotations, instanceIsRunningAnnotation) require.Equal(t, 99, got.Status.ID) + require.Equal(t, 2, got.Status.Version, "Status.Version must follow the freshly-written id") + require.Equal(t, fingerprintSchema(schema, nil), got.Annotations[kafkaSchemaAppliedFingerprintAnnotation], + "applySchema must overwrite the stale fingerprint") }) - t.Run("Requeues when tracked ID is not yet visible in the registry", func(t *testing.T) { + t.Run("Re-applies via idempotent POST when no fingerprint annotation is set", func(t *testing.T) { schema := newObjectFromYAML[v1alpha1.KafkaSchema](t, yamlKafkaSchema) schema.Generation = 1 schema.Annotations = map[string]string{processedGenerationAnnotation: "1"} + // Stale Status.ID schema.Status.ID = 42 avn := avngen.NewMockClient(t) @@ -259,6 +294,14 @@ func TestKafkaSchemaReconciler(t *testing.T) { avn.EXPECT(). ServiceSchemaRegistrySubjectVersionsGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName). Return([]int{1}, nil).Once() + // Observe never iterates versions; only applySchema does, after POST. + avn.EXPECT(). + ServiceSchemaRegistrySubjectVersionPost( + mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, mock.Anything, + ).Return(7, nil).Once() + avn.EXPECT(). + ServiceSchemaRegistrySubjectVersionsGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName). + Return([]int{1}, nil).Once() avn.EXPECT(). ServiceSchemaRegistrySubjectVersionGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, 1). Return(&kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionGetOut{Id: 7, Version: 1}, nil).Once() @@ -269,7 +312,10 @@ func TestKafkaSchemaReconciler(t *testing.T) { got := &v1alpha1.KafkaSchema{} require.NoError(t, r.Get(t.Context(), types.NamespacedName{Name: schema.Name, Namespace: schema.Namespace}, got)) - require.NotContains(t, got.Annotations, instanceIsRunningAnnotation) + require.Equal(t, 7, got.Status.ID, "Status.ID is overwritten by the POST response") + require.Equal(t, 1, got.Status.Version, "Status.Version follows the freshly-written id") + require.Equal(t, fingerprintSchema(schema, nil), got.Annotations[kafkaSchemaAppliedFingerprintAnnotation], + "the fingerprint annotation is the source of truth for drift detection on the next pass") }) // Soft-delete followed by hard-delete @@ -352,6 +398,12 @@ func TestKafkaSchemaReconciler(t *testing.T) { return ref.Name == "common.proto" && ref.Subject == "resolved-subject" && ref.Version == 7 }), ).Return(11, nil).Once() + avn.EXPECT(). + ServiceSchemaRegistrySubjectVersionsGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName). + Return([]int{1}, nil).Once() + avn.EXPECT(). + ServiceSchemaRegistrySubjectVersionGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, 1). + Return(&kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionGetOut{Id: 11, Version: 1}, nil).Once() r, res, err := runKafkaSchemaScenario(t, schema, avn, referent) require.NoError(t, err) @@ -412,6 +464,12 @@ func TestKafkaSchemaReconciler(t *testing.T) { second.Name == "shared.proto" && second.Subject == "shared-subject" && second.Version == 3 }), ).Return(55, nil).Once() + avn.EXPECT(). + ServiceSchemaRegistrySubjectVersionsGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName). + Return([]int{1}, nil).Once() + avn.EXPECT(). + ServiceSchemaRegistrySubjectVersionGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, 1). + Return(&kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionGetOut{Id: 55, Version: 1}, nil).Once() r, res, err := runKafkaSchemaScenario(t, schema, avn, referent) require.NoError(t, err) @@ -575,6 +633,12 @@ func TestKafkaSchemaReconciler(t *testing.T) { ref.Version == 7 }), ).Return(99, nil).Once() + avn.EXPECT(). + ServiceSchemaRegistrySubjectVersionsGet(mock.Anything, dependent.Spec.Project, dependent.Spec.ServiceName, dependent.Spec.SubjectName). + Return([]int{1}, nil).Once() + avn.EXPECT(). + ServiceSchemaRegistrySubjectVersionGet(mock.Anything, dependent.Spec.Project, dependent.Spec.ServiceName, dependent.Spec.SubjectName, 1). + Return(&kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionGetOut{Id: 99, Version: 1}, nil).Once() res, err = r.Reconcile(t.Context(), ctrlruntime.Request{ NamespacedName: types.NamespacedName{Name: dependent.Name, Namespace: dependent.Namespace}, @@ -613,15 +677,21 @@ func TestKafkaSchemaReconciler(t *testing.T) { schema := newObjectFromYAML[v1alpha1.KafkaSchema](t, yamlKafkaSchema) schema.Generation = 1 - schema.Annotations = map[string]string{ - processedGenerationAnnotation: "1", - instanceIsRunningAnnotation: "true", - } schema.Spec.SchemaType = kafkaschemaregistry.SchemaTypeProtobuf schema.Spec.References = []v1alpha1.SchemaReference{ {Name: "common.proto", KafkaSchemaRef: &v1alpha1.LocalKafkaSchemaRef{Name: "referent"}}, } - // Dependent's own Status.ID was assigned at version=1 of the referent. + // Fingerprint pinned to the referent's previous version (1). + // When the referent advances to version 2 the desired fingerprint + // changes and Observe detects drift without ever reading Status. + oldFP := fingerprintSchema(schema, []kafkaschemaregistry.ReferenceIn{ + {Name: "common.proto", Subject: "resolved-subject", Version: 1}, + }) + schema.Annotations = map[string]string{ + processedGenerationAnnotation: "1", + instanceIsRunningAnnotation: "true", + kafkaSchemaAppliedFingerprintAnnotation: oldFP, + } schema.Status.ID = 50 schema.Status.Version = 1 @@ -629,13 +699,9 @@ func TestKafkaSchemaReconciler(t *testing.T) { avn.EXPECT(). ServiceGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, mock.Anything). Return(runningService(), nil).Once() - // The registry currently has the dependent at version=1. Observe will see Status.ID match. avn.EXPECT(). ServiceSchemaRegistrySubjectVersionsGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName). Return([]int{1}, nil).Once() - avn.EXPECT(). - ServiceSchemaRegistrySubjectVersionGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, 1). - Return(&kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionGetOut{Id: 50, Version: 1}, nil).Once() // EXPECTED: a fresh POST that carries the referent's NEW version (2). avn.EXPECT(). @@ -649,6 +715,12 @@ func TestKafkaSchemaReconciler(t *testing.T) { return ref.Name == "common.proto" && ref.Subject == "resolved-subject" && ref.Version == 2 }), ).Return(60, nil).Once() + avn.EXPECT(). + ServiceSchemaRegistrySubjectVersionsGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName). + Return([]int{1, 2}, nil).Once() + avn.EXPECT(). + ServiceSchemaRegistrySubjectVersionGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, 2). + Return(&kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionGetOut{Id: 60, Version: 2}, nil).Once() r, res, err := runKafkaSchemaScenario(t, schema, avn, referent) require.NoError(t, err) @@ -660,18 +732,26 @@ func TestKafkaSchemaReconciler(t *testing.T) { require.NotContains(t, got.Annotations, instanceIsRunningAnnotation, "Update must clear the running annotation when re-POSTing") }) - // Remove all references when Spec.References is empty, but - // the registry currently serves the schema with reference still attached. + // Spec drops all references. Fingerprint annotation still reflects an + // earlier WITH-references state, so Observe sees drift and re-POSTs + // without consulting Status or fetching individual versions. t.Run("Re-POSTs without References when spec drops all references", func(t *testing.T) { schema := newObjectFromYAML[v1alpha1.KafkaSchema](t, yamlKafkaSchema) schema.Generation = 1 - schema.Annotations = map[string]string{ - processedGenerationAnnotation: "1", - instanceIsRunningAnnotation: "true", - } schema.Spec.SchemaType = kafkaschemaregistry.SchemaTypeProtobuf - // Spec carries no references. schema.Spec.References = nil + + // Fingerprint pinned to a previous spec that carried a reference. + stalePrev := newObjectFromYAML[v1alpha1.KafkaSchema](t, yamlKafkaSchema) + stalePrev.Spec.SchemaType = kafkaschemaregistry.SchemaTypeProtobuf + staleFP := fingerprintSchema(stalePrev, []kafkaschemaregistry.ReferenceIn{ + {Name: "stale.proto", Subject: "stale-subject", Version: 1}, + }) + schema.Annotations = map[string]string{ + processedGenerationAnnotation: "1", + instanceIsRunningAnnotation: "true", + kafkaSchemaAppliedFingerprintAnnotation: staleFP, + } schema.Status.ID = 77 schema.Status.Version = 3 @@ -682,16 +762,6 @@ func TestKafkaSchemaReconciler(t *testing.T) { avn.EXPECT(). ServiceSchemaRegistrySubjectVersionsGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName). Return([]int{3}, nil).Once() - // Registry reports one reference. Observe must treat this as stale. - avn.EXPECT(). - ServiceSchemaRegistrySubjectVersionGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, 3). - Return(&kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionGetOut{ - Id: 77, - Version: 3, - References: []kafkaschemaregistry.ReferenceOut{ - {Name: "stale.proto", Subject: "stale-subject", Version: 1}, - }, - }, nil).Once() avn.EXPECT(). ServiceSchemaRegistrySubjectVersionPost( @@ -700,6 +770,12 @@ func TestKafkaSchemaReconciler(t *testing.T) { return in.References == nil }), ).Return(78, nil).Once() + avn.EXPECT(). + ServiceSchemaRegistrySubjectVersionsGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName). + Return([]int{3, 4}, nil).Once() + avn.EXPECT(). + ServiceSchemaRegistrySubjectVersionGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, 4). + Return(&kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionGetOut{Id: 78, Version: 4}, nil).Once() r, res, err := runKafkaSchemaScenario(t, schema, avn) require.NoError(t, err) @@ -946,82 +1022,56 @@ func TestKafkaSchemaRefIndexValues(t *testing.T) { }) } -// The schema parser identifies refs by name (the import path / $ref key), ordering guarantee -// is not documented. Reordered ref lists must compare equal. -func TestReferencesEqual(t *testing.T) { - in := func(name, subject string, version int) kafkaschemaregistry.ReferenceIn { - return kafkaschemaregistry.ReferenceIn{Name: name, Subject: subject, Version: version} - } - out := func(name, subject string, version int) kafkaschemaregistry.ReferenceOut { - return kafkaschemaregistry.ReferenceOut{Name: name, Subject: subject, Version: version} - } - - t.Run("both empty", func(t *testing.T) { - require.True(t, referencesEqual(nil, nil)) - }) - - t.Run("different lengths", func(t *testing.T) { - require.False(t, referencesEqual( - []kafkaschemaregistry.ReferenceIn{in("a.proto", "a", 1)}, - []kafkaschemaregistry.ReferenceOut{out("a.proto", "a", 1), out("b.proto", "b", 1)}, - )) +func TestFingerprintSchema(t *testing.T) { + t.Run("empty CompatibilityLevel must equal absent CompatibilityLevel", func(t *testing.T) { + absent := newObjectFromYAML[v1alpha1.KafkaSchema](t, yamlKafkaSchema) + empty := newObjectFromYAML[v1alpha1.KafkaSchema](t, yamlKafkaSchema) + empty.Spec.CompatibilityLevel = "" + require.Equal(t, fingerprintSchema(absent, nil), fingerprintSchema(empty, nil), + "empty CompatibilityLevel must hash identically to an absent one — applySchema treats both as 'unmanaged'") }) - t.Run("identical, same order", func(t *testing.T) { - require.True(t, referencesEqual( - []kafkaschemaregistry.ReferenceIn{in("a.proto", "a", 1), in("b.proto", "b", 2)}, - []kafkaschemaregistry.ReferenceOut{out("a.proto", "a", 1), out("b.proto", "b", 2)}, - )) + t.Run("non-empty CompatibilityLevel changes the fingerprint", func(t *testing.T) { + absent := newObjectFromYAML[v1alpha1.KafkaSchema](t, yamlKafkaSchema) + set := newObjectFromYAML[v1alpha1.KafkaSchema](t, yamlKafkaSchema) + set.Spec.CompatibilityLevel = kafkaschemaregistry.CompatibilityTypeBackward + require.NotEqual(t, fingerprintSchema(absent, nil), fingerprintSchema(set, nil), + "setting a CompatibilityLevel must change the fingerprint so the next pass detects drift and PUTs the config") }) - t.Run("identical, reordered", func(t *testing.T) { - require.True(t, referencesEqual( - []kafkaschemaregistry.ReferenceIn{in("a.proto", "a", 1), in("b.proto", "b", 2)}, - []kafkaschemaregistry.ReferenceOut{out("b.proto", "b", 2), out("a.proto", "a", 1)}, - )) + t.Run("schema body change flips the fingerprint", func(t *testing.T) { + a := newObjectFromYAML[v1alpha1.KafkaSchema](t, yamlKafkaSchema) + b := newObjectFromYAML[v1alpha1.KafkaSchema](t, yamlKafkaSchema) + b.Spec.Schema = `{"type":"record","name":"Test","fields":[{"name":"id","type":"int"}]}` + require.NotEqual(t, fingerprintSchema(a, nil), fingerprintSchema(b, nil)) }) - t.Run("subject mismatch under same name", func(t *testing.T) { - require.False(t, referencesEqual( - []kafkaschemaregistry.ReferenceIn{in("a.proto", "a", 1)}, - []kafkaschemaregistry.ReferenceOut{out("a.proto", "a-other", 1)}, - )) + t.Run("schema type change flips the fingerprint", func(t *testing.T) { + a := newObjectFromYAML[v1alpha1.KafkaSchema](t, yamlKafkaSchema) + b := newObjectFromYAML[v1alpha1.KafkaSchema](t, yamlKafkaSchema) + b.Spec.SchemaType = kafkaschemaregistry.SchemaTypeProtobuf + require.NotEqual(t, fingerprintSchema(a, nil), fingerprintSchema(b, nil)) }) - t.Run("version mismatch under same name", func(t *testing.T) { - require.False(t, referencesEqual( - []kafkaschemaregistry.ReferenceIn{in("a.proto", "a", 1)}, - []kafkaschemaregistry.ReferenceOut{out("a.proto", "a", 2)}, - )) - }) - - t.Run("name not present in got", func(t *testing.T) { - require.False(t, referencesEqual( - []kafkaschemaregistry.ReferenceIn{in("a.proto", "a", 1)}, - []kafkaschemaregistry.ReferenceOut{out("z.proto", "a", 1)}, - )) - }) - - t.Run("desired empty, got non-empty", func(t *testing.T) { - require.False(t, referencesEqual( - nil, - []kafkaschemaregistry.ReferenceOut{out("a.proto", "a", 1)}, - )) - }) - - t.Run("desired non-empty, got empty", func(t *testing.T) { - require.False(t, referencesEqual( - []kafkaschemaregistry.ReferenceIn{in("a.proto", "a", 1)}, - nil, - )) + t.Run("references are order-insensitive", func(t *testing.T) { + s := newObjectFromYAML[v1alpha1.KafkaSchema](t, yamlKafkaSchema) + refsA := []kafkaschemaregistry.ReferenceIn{ + {Name: "a.proto", Subject: "a", Version: 1}, + {Name: "b.proto", Subject: "b", Version: 2}, + } + refsB := []kafkaschemaregistry.ReferenceIn{ + {Name: "b.proto", Subject: "b", Version: 2}, + {Name: "a.proto", Subject: "a", Version: 1}, + } + require.Equal(t, fingerprintSchema(s, refsA), fingerprintSchema(s, refsB)) }) - // If the registry returns duplicate Names, the length check must reject the comparison. - t.Run("duplicate names in got are not equal", func(t *testing.T) { - require.False(t, referencesEqual( - []kafkaschemaregistry.ReferenceIn{in("a.proto", "a", 1)}, - []kafkaschemaregistry.ReferenceOut{out("a.proto", "a", 1), out("a.proto", "a", 1)}, - )) + t.Run("a reference version bump flips the fingerprint", func(t *testing.T) { + s := newObjectFromYAML[v1alpha1.KafkaSchema](t, yamlKafkaSchema) + v1 := []kafkaschemaregistry.ReferenceIn{{Name: "a.proto", Subject: "a", Version: 1}} + v2 := []kafkaschemaregistry.ReferenceIn{{Name: "a.proto", Subject: "a", Version: 2}} + require.NotEqual(t, fingerprintSchema(s, v1), fingerprintSchema(s, v2), + "a referent advancing to a new version must re-fingerprint so dependents re-POST") }) } diff --git a/controllers/reconciler.go b/controllers/reconciler.go index 35da9bdb..5e8d960f 100644 --- a/controllers/reconciler.go +++ b/controllers/reconciler.go @@ -249,7 +249,11 @@ func (r *Reconciler[T]) persistReconcileState(ctx context.Context, orig v1alpha1 // Capture annotation changes before Status().Update because it may mutate obj metadata. annotations := map[string]any{} - for _, key := range []string{processedGenerationAnnotation, instanceIsRunningAnnotation} { + for _, key := range []string{ + processedGenerationAnnotation, + instanceIsRunningAnnotation, + kafkaSchemaAppliedFingerprintAnnotation, + } { origValue, origOk := orig.GetAnnotations()[key] value, ok := obj.GetAnnotations()[key] if origOk == ok && origValue == value {