Skip to content

Commit 7afec87

Browse files
committed
fix(ci): use full check-mode for trunk in workflow_call
1 parent b1b4b14 commit 7afec87

2 files changed

Lines changed: 144 additions & 13 deletions

File tree

controllers/kafkaschema_controller.go

Lines changed: 72 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
apierrors "k8s.io/apimachinery/pkg/api/errors"
1818
"k8s.io/apimachinery/pkg/api/meta"
1919
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
20+
"k8s.io/client-go/util/retry"
2021
ctrl "sigs.k8s.io/controller-runtime"
2122
"sigs.k8s.io/controller-runtime/pkg/builder"
2223
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -132,7 +133,7 @@ func (r *KafkaSchemaController) Observe(ctx context.Context, schema *v1alpha1.Ka
132133
return Observation{}, err
133134
}
134135

135-
versions, err := r.avnGen.ServiceSchemaRegistrySubjectVersionsGet(
136+
_, err := r.avnGen.ServiceSchemaRegistrySubjectVersionsGet(
136137
ctx,
137138
schema.Spec.Project,
138139
schema.Spec.ServiceName,
@@ -168,11 +169,6 @@ func (r *KafkaSchemaController) Observe(ctx context.Context, schema *v1alpha1.Ka
168169
return Observation{ResourceExists: true, ResourceUpToDate: false}, nil
169170
}
170171

171-
if len(versions) > 0 {
172-
sort.Slice(versions, func(i, j int) bool { return versions[i] > versions[j] })
173-
schema.Status.Version = versions[0]
174-
}
175-
176172
meta.SetStatusCondition(&schema.Status.Conditions,
177173
getRunningCondition(metav1.ConditionTrue, "CheckRunning", "Instance is running on Aiven side"))
178174
metav1.SetMetaDataAnnotation(&schema.ObjectMeta, instanceIsRunningAnnotation, "true")
@@ -245,9 +241,16 @@ func (r *KafkaSchemaController) applySchema(ctx context.Context, schema *v1alpha
245241
return fmt.Errorf("cannot add Kafka Schema Subject: %w", err)
246242
}
247243

248-
// Status.ID is informational only — Observe never reads it. Drift is
249-
// detected via the annotation fingerprint written below.
244+
// Status.Version is the version that holds OUR schemaID — the version
245+
// THIS CR registered. Dependents resolve `kafkaSchemaRef` by reading
246+
// it, so it must be the version we wrote, not the registry's max
247+
// (another writer to the same subject would otherwise leak through).
248+
version, err := r.lookupVersionForID(ctx, schema, schemaID)
249+
if err != nil {
250+
return fmt.Errorf("locating version for schema ID %d: %w", schemaID, err)
251+
}
250252
schema.Status.ID = schemaID
253+
schema.Status.Version = version
251254

252255
if schema.Spec.CompatibilityLevel != "" {
253256
if _, err := r.avnGen.ServiceSchemaRegistrySubjectConfigPut(
@@ -267,9 +270,70 @@ func (r *KafkaSchemaController) applySchema(ctx context.Context, schema *v1alpha
267270
fingerprintSchema(schema, resolvedRefs),
268271
)
269272

273+
// TODO: workaround for the stale-status race in the managed reconciler.
274+
// Remove once updateStatus stops clobbering concurrently-written status fields.
275+
if err := r.persistAppliedStatus(ctx, schema); err != nil {
276+
return fmt.Errorf("persisting Status: %w", err)
277+
}
278+
270279
return nil
271280
}
272281

282+
// lookupVersionForID returns the registry version holding the given schema id.
283+
func (r *KafkaSchemaController) lookupVersionForID(
284+
ctx context.Context, schema *v1alpha1.KafkaSchema, id int,
285+
) (int, error) {
286+
versions, err := r.avnGen.ServiceSchemaRegistrySubjectVersionsGet(
287+
ctx,
288+
schema.Spec.Project,
289+
schema.Spec.ServiceName,
290+
schema.Spec.SubjectName,
291+
)
292+
if err != nil {
293+
return 0, fmt.Errorf("listing Kafka Schema versions: %w", err)
294+
}
295+
296+
sort.Slice(versions, func(i, j int) bool { return versions[i] > versions[j] })
297+
for _, v := range versions {
298+
got, err := r.avnGen.ServiceSchemaRegistrySubjectVersionGet(
299+
ctx,
300+
schema.Spec.Project,
301+
schema.Spec.ServiceName,
302+
schema.Spec.SubjectName,
303+
v,
304+
)
305+
if err != nil {
306+
return 0, fmt.Errorf("getting Kafka Schema version %d: %w", v, err)
307+
}
308+
if got.Id == id {
309+
return got.Version, nil
310+
}
311+
}
312+
313+
return 0, fmt.Errorf("%w: schema ID %d not visible in registry yet", errPreconditionNotMet, id)
314+
}
315+
316+
// persistAppliedStatus writes Status.ID and Status.Version in their own
317+
// status-subresource update, retrying on optimistic-concurrency conflicts.
318+
//
319+
// TODO: workaround for the stale-status race documented at reconciler.go:237.
320+
// Remove once updateStatus in the managed reconciler no longer clobbers
321+
// concurrently-written status fields.
322+
func (r *KafkaSchemaController) persistAppliedStatus(ctx context.Context, schema *v1alpha1.KafkaSchema) error {
323+
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
324+
latest := &v1alpha1.KafkaSchema{}
325+
if err := r.Get(ctx, client.ObjectKeyFromObject(schema), latest); err != nil {
326+
return err
327+
}
328+
latest.Status.ID = schema.Status.ID
329+
latest.Status.Version = schema.Status.Version
330+
if latest.Status.Conditions == nil {
331+
latest.Status.Conditions = []metav1.Condition{}
332+
}
333+
return r.Status().Update(ctx, latest)
334+
})
335+
}
336+
273337
// fingerprintSchema returns a stable hash of the provided schema.
274338
func fingerprintSchema(schema *v1alpha1.KafkaSchema, refs []kafkaschemaregistry.ReferenceIn) string {
275339
sorted := append([]kafkaschemaregistry.ReferenceIn(nil), refs...)

controllers/kafkaschema_controller_test.go

Lines changed: 72 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,14 @@ func TestKafkaSchemaReconciler(t *testing.T) {
102102
in.References == nil
103103
}),
104104
).Return(42, nil).Once()
105+
// applySchema looks up the version that holds the freshly-written id
106+
// so Status.Version reflects what THIS CR registered.
107+
avn.EXPECT().
108+
ServiceSchemaRegistrySubjectVersionsGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName).
109+
Return([]int{1}, nil).Once()
110+
avn.EXPECT().
111+
ServiceSchemaRegistrySubjectVersionGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, 1).
112+
Return(&kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionGetOut{Id: 42, Version: 1}, nil).Once()
105113

106114
r, res, err := runKafkaSchemaScenario(t, schema, avn)
107115
require.NoError(t, err)
@@ -112,6 +120,7 @@ func TestKafkaSchemaReconciler(t *testing.T) {
112120
require.Equal(t, "1", got.Annotations[processedGenerationAnnotation])
113121
require.NotContains(t, got.Annotations, instanceIsRunningAnnotation)
114122
require.Equal(t, 42, got.Status.ID)
123+
require.Equal(t, 1, got.Status.Version, "Status.Version must be the version that holds the freshly-written id")
115124
require.Equal(t, fingerprintSchema(schema, nil), got.Annotations[kafkaSchemaAppliedFingerprintAnnotation],
116125
"applySchema must record the applied fingerprint so the next Observe sees no drift")
117126
})
@@ -132,6 +141,12 @@ func TestKafkaSchemaReconciler(t *testing.T) {
132141
ServiceSchemaRegistrySubjectVersionPost(
133142
mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, mock.Anything,
134143
).Return(7, nil).Once()
144+
avn.EXPECT().
145+
ServiceSchemaRegistrySubjectVersionsGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName).
146+
Return([]int{1}, nil).Once()
147+
avn.EXPECT().
148+
ServiceSchemaRegistrySubjectVersionGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, 1).
149+
Return(&kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionGetOut{Id: 7, Version: 1}, nil).Once()
135150
avn.EXPECT().
136151
ServiceSchemaRegistrySubjectConfigPut(
137152
mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName,
@@ -175,6 +190,12 @@ func TestKafkaSchemaReconciler(t *testing.T) {
175190
return ref.Name == "common.proto" && ref.Subject == "common-subject" && ref.Version == 1
176191
}),
177192
).Return(11, nil).Once()
193+
avn.EXPECT().
194+
ServiceSchemaRegistrySubjectVersionsGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName).
195+
Return([]int{1}, nil).Once()
196+
avn.EXPECT().
197+
ServiceSchemaRegistrySubjectVersionGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, 1).
198+
Return(&kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionGetOut{Id: 11, Version: 1}, nil).Once()
178199

179200
r, res, err := runKafkaSchemaScenario(t, schema, avn)
180201
require.NoError(t, err)
@@ -193,6 +214,7 @@ func TestKafkaSchemaReconciler(t *testing.T) {
193214
kafkaSchemaAppliedFingerprintAnnotation: fingerprintSchema(schema, nil),
194215
}
195216
schema.Status.ID = 42
217+
schema.Status.Version = 2
196218

197219
avn := avngen.NewMockClient(t)
198220
avn.EXPECT().
@@ -209,10 +231,11 @@ func TestKafkaSchemaReconciler(t *testing.T) {
209231
got := &v1alpha1.KafkaSchema{}
210232
require.NoError(t, r.Get(t.Context(), types.NamespacedName{Name: schema.Name, Namespace: schema.Namespace}, got))
211233
require.Equal(t, "true", got.Annotations[instanceIsRunningAnnotation])
212-
// Status.Version is a breadcrumb: max of the versions list returned by
213-
// the registry. Never read for control flow.
214-
require.Equal(t, 3, got.Status.Version,
215-
"Observe must record the latest registry version as a Status breadcrumb")
234+
// Status.Version is the version THIS CR registered, written by
235+
// applySchema. Observe must not overwrite it with the registry's
236+
// max — dependents resolve `kafkaSchemaRef` through this field.
237+
require.Equal(t, 2, got.Status.Version,
238+
"Observe must not clobber Status.Version with the registry's max")
216239
})
217240

218241
t.Run("Updates KafkaSchema when fingerprint disagrees with spec", func(t *testing.T) {
@@ -239,6 +262,12 @@ func TestKafkaSchemaReconciler(t *testing.T) {
239262
ServiceSchemaRegistrySubjectVersionPost(
240263
mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, mock.Anything,
241264
).Return(99, nil).Once()
265+
avn.EXPECT().
266+
ServiceSchemaRegistrySubjectVersionsGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName).
267+
Return([]int{1, 2}, nil).Once()
268+
avn.EXPECT().
269+
ServiceSchemaRegistrySubjectVersionGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, 2).
270+
Return(&kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionGetOut{Id: 99, Version: 2}, nil).Once()
242271

243272
r, res, err := runKafkaSchemaScenario(t, schema, avn)
244273
require.NoError(t, err)
@@ -249,6 +278,7 @@ func TestKafkaSchemaReconciler(t *testing.T) {
249278
require.Equal(t, "2", got.Annotations[processedGenerationAnnotation])
250279
require.NotContains(t, got.Annotations, instanceIsRunningAnnotation)
251280
require.Equal(t, 99, got.Status.ID)
281+
require.Equal(t, 2, got.Status.Version, "Status.Version must follow the freshly-written id")
252282
require.Equal(t, fingerprintSchema(schema, nil), got.Annotations[kafkaSchemaAppliedFingerprintAnnotation],
253283
"applySchema must overwrite the stale fingerprint")
254284
})
@@ -267,11 +297,17 @@ func TestKafkaSchemaReconciler(t *testing.T) {
267297
avn.EXPECT().
268298
ServiceSchemaRegistrySubjectVersionsGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName).
269299
Return([]int{1}, nil).Once()
270-
// No VersionGet expectation — Observe never iterates versions anymore.
300+
// Observe never iterates versions; only applySchema does, after POST.
271301
avn.EXPECT().
272302
ServiceSchemaRegistrySubjectVersionPost(
273303
mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, mock.Anything,
274304
).Return(7, nil).Once()
305+
avn.EXPECT().
306+
ServiceSchemaRegistrySubjectVersionsGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName).
307+
Return([]int{1}, nil).Once()
308+
avn.EXPECT().
309+
ServiceSchemaRegistrySubjectVersionGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, 1).
310+
Return(&kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionGetOut{Id: 7, Version: 1}, nil).Once()
275311

276312
r, res, err := runKafkaSchemaScenario(t, schema, avn)
277313
require.NoError(t, err)
@@ -280,6 +316,7 @@ func TestKafkaSchemaReconciler(t *testing.T) {
280316
got := &v1alpha1.KafkaSchema{}
281317
require.NoError(t, r.Get(t.Context(), types.NamespacedName{Name: schema.Name, Namespace: schema.Namespace}, got))
282318
require.Equal(t, 7, got.Status.ID, "Status.ID is overwritten by the POST response")
319+
require.Equal(t, 1, got.Status.Version, "Status.Version follows the freshly-written id")
283320
require.Equal(t, fingerprintSchema(schema, nil), got.Annotations[kafkaSchemaAppliedFingerprintAnnotation],
284321
"the fingerprint annotation is the source of truth for drift detection on the next pass")
285322
})
@@ -364,6 +401,12 @@ func TestKafkaSchemaReconciler(t *testing.T) {
364401
return ref.Name == "common.proto" && ref.Subject == "resolved-subject" && ref.Version == 7
365402
}),
366403
).Return(11, nil).Once()
404+
avn.EXPECT().
405+
ServiceSchemaRegistrySubjectVersionsGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName).
406+
Return([]int{1}, nil).Once()
407+
avn.EXPECT().
408+
ServiceSchemaRegistrySubjectVersionGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, 1).
409+
Return(&kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionGetOut{Id: 11, Version: 1}, nil).Once()
367410

368411
r, res, err := runKafkaSchemaScenario(t, schema, avn, referent)
369412
require.NoError(t, err)
@@ -424,6 +467,12 @@ func TestKafkaSchemaReconciler(t *testing.T) {
424467
second.Name == "shared.proto" && second.Subject == "shared-subject" && second.Version == 3
425468
}),
426469
).Return(55, nil).Once()
470+
avn.EXPECT().
471+
ServiceSchemaRegistrySubjectVersionsGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName).
472+
Return([]int{1}, nil).Once()
473+
avn.EXPECT().
474+
ServiceSchemaRegistrySubjectVersionGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, 1).
475+
Return(&kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionGetOut{Id: 55, Version: 1}, nil).Once()
427476

428477
r, res, err := runKafkaSchemaScenario(t, schema, avn, referent)
429478
require.NoError(t, err)
@@ -587,6 +636,12 @@ func TestKafkaSchemaReconciler(t *testing.T) {
587636
ref.Version == 7
588637
}),
589638
).Return(99, nil).Once()
639+
avn.EXPECT().
640+
ServiceSchemaRegistrySubjectVersionsGet(mock.Anything, dependent.Spec.Project, dependent.Spec.ServiceName, dependent.Spec.SubjectName).
641+
Return([]int{1}, nil).Once()
642+
avn.EXPECT().
643+
ServiceSchemaRegistrySubjectVersionGet(mock.Anything, dependent.Spec.Project, dependent.Spec.ServiceName, dependent.Spec.SubjectName, 1).
644+
Return(&kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionGetOut{Id: 99, Version: 1}, nil).Once()
590645

591646
res, err = r.Reconcile(t.Context(), ctrlruntime.Request{
592647
NamespacedName: types.NamespacedName{Name: dependent.Name, Namespace: dependent.Namespace},
@@ -663,6 +718,12 @@ func TestKafkaSchemaReconciler(t *testing.T) {
663718
return ref.Name == "common.proto" && ref.Subject == "resolved-subject" && ref.Version == 2
664719
}),
665720
).Return(60, nil).Once()
721+
avn.EXPECT().
722+
ServiceSchemaRegistrySubjectVersionsGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName).
723+
Return([]int{1, 2}, nil).Once()
724+
avn.EXPECT().
725+
ServiceSchemaRegistrySubjectVersionGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, 2).
726+
Return(&kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionGetOut{Id: 60, Version: 2}, nil).Once()
666727

667728
r, res, err := runKafkaSchemaScenario(t, schema, avn, referent)
668729
require.NoError(t, err)
@@ -712,6 +773,12 @@ func TestKafkaSchemaReconciler(t *testing.T) {
712773
return in.References == nil
713774
}),
714775
).Return(78, nil).Once()
776+
avn.EXPECT().
777+
ServiceSchemaRegistrySubjectVersionsGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName).
778+
Return([]int{3, 4}, nil).Once()
779+
avn.EXPECT().
780+
ServiceSchemaRegistrySubjectVersionGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, 4).
781+
Return(&kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionGetOut{Id: 78, Version: 4}, nil).Once()
715782

716783
r, res, err := runKafkaSchemaScenario(t, schema, avn)
717784
require.NoError(t, err)

0 commit comments

Comments
 (0)