Skip to content

Commit 1e63801

Browse files
authored
feat(kafka_schema): added kafka schema references (#1176)
1 parent 9cc5c40 commit 1e63801

14 files changed

Lines changed: 2101 additions & 202 deletions

CHANGELOG.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,15 @@
22

33
## [MAJOR.MINOR.PATCH] - YYYY-MM-DD
44

5+
- Add `KafkaSchema` field `references[].kafkaSchemaRef`, type `object`: Reference to another
6+
`KafkaSchema` resource in the same namespace. The subject and version are resolved from the
7+
referenced resource's spec and status, dependents pick up new versions automatically.
8+
Mutually exclusive with `subject` + `version` on the same entry.
9+
- Change `KafkaSchema` field `references`: maxItems `100`
10+
- **BREAKING**: Change `KafkaSchema` deletion to perform a hard delete instead of soft delete only.
11+
The subject is no longer visible in the registry's listing after deletion,
12+
and re-applying a `KafkaSchema` with the same `subjectName` after deletion starts at version 1.
13+
514
## v0.38.0 - 2026-05-18
615

716
- Add `MySQL` and `PostgreSQL` field `migrationSecretSource`, type `object`: Reference to a Secret containing migration

api/v1alpha1/kafkaschema_types.go

Lines changed: 62 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package v1alpha1
55
import (
66
"github.com/aiven/go-client-codegen/handler/kafkaschemaregistry"
77
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
8+
"k8s.io/apimachinery/pkg/types"
89
)
910

1011
// KafkaSchemaSpec defines the desired state of KafkaSchema
@@ -16,7 +17,7 @@ type KafkaSchemaSpec struct {
1617
// Kafka Schema Subject name
1718
SubjectName string `json:"subjectName"`
1819

19-
// Kafka Schema configuration should be a valid Avro Schema JSON format
20+
// Kafka Schema definition. Format depends on schemaType (AVRO/JSON/PROTOBUF)
2021
Schema string `json:"schema"`
2122

2223
// +kubebuilder:validation:Enum=AVRO;JSON;PROTOBUF
@@ -28,23 +29,49 @@ type KafkaSchemaSpec struct {
2829
// Kafka Schemas compatibility level
2930
CompatibilityLevel kafkaschemaregistry.CompatibilityType `json:"compatibilityLevel,omitempty"`
3031

31-
// Schema references for Protobuf or JSON schemas that import other schemas
32+
// +kubebuilder:validation:MaxItems=100
33+
// +listType=map
34+
// +listMapKey=name
35+
// Schema references for Protobuf or JSON schemas that import other schemas.
36+
// References must form a directed acyclic graph (DAG); cycles are not allowed.
3237
References []SchemaReference `json:"references,omitempty"`
3338
}
3439

35-
// SchemaReference is a reference to another schema in the registry
40+
// SchemaReference is a reference to another schema in the registry.
41+
// Exactly one of {subject+version} or kafkaSchemaRef must be set.
42+
// +kubebuilder:validation:XValidation:rule="(has(self.subject) && has(self.version) && !has(self.kafkaSchemaRef)) || (!has(self.subject) && !has(self.version) && has(self.kafkaSchemaRef))",message="set both subject and version, or set kafkaSchemaRef, but not both"
3643
type SchemaReference struct {
3744
// +kubebuilder:validation:MinLength=1
45+
// +kubebuilder:validation:MaxLength=512
3846
// Name used to reference the schema (e.g., the import path in Protobuf)
3947
Name string `json:"name"`
4048

4149
// +kubebuilder:validation:MinLength=1
42-
// Subject name of the referenced schema in the registry
43-
Subject string `json:"subject"`
50+
// +kubebuilder:validation:MaxLength=512
51+
// Subject name of the referenced schema in the registry. Mutually exclusive with kafkaSchemaRef.
52+
// +optional
53+
Subject string `json:"subject,omitempty"`
4454

4555
// +kubebuilder:validation:Minimum=1
46-
// Version of the referenced schema
47-
Version int `json:"version"`
56+
// Version of the referenced schema. Mutually exclusive with kafkaSchemaRef.
57+
// +optional
58+
Version int `json:"version,omitempty"`
59+
60+
// Reference to another KafkaSchema resource in the same namespace.
61+
// Mutually exclusive with subject/version.
62+
//
63+
// Cleanup order matters: delete the dependent before the referent.
64+
// +optional
65+
KafkaSchemaRef *LocalKafkaSchemaRef `json:"kafkaSchemaRef,omitempty"`
66+
}
67+
68+
// LocalKafkaSchemaRef references another KafkaSchema in the same namespace as the owner.
69+
// Cross-namespace references are not supported to avoid confused-deputy situations in multi-tenant clusters.
70+
type LocalKafkaSchemaRef struct {
71+
// +kubebuilder:validation:MinLength=1
72+
// +kubebuilder:validation:MaxLength=253
73+
// Name of the KafkaSchema resource in the same namespace.
74+
Name string `json:"name"`
4875
}
4976

5077
// KafkaSchemaStatus defines the observed state of KafkaSchema
@@ -62,12 +89,20 @@ type KafkaSchemaStatus struct {
6289
// +kubebuilder:object:root=true
6390
// +kubebuilder:subresource:status
6491

65-
// KafkaSchema is the Schema for the kafkaschemas API
92+
// KafkaSchema is the Schema for the kafkaschemas API.
93+
//
94+
// Self-references (A -> A) are blocked at admission; transitive cycles
95+
// (A -> B -> A) are not detected at admission time.
96+
//
97+
// Deletion: the operator performs a soft delete followed by a hard delete on
98+
// the subject. The subject disappears from the registry's listing, re-applying a KafkaSchema with the same subjectName
99+
// after deletion starts a brand-new subject at version 1.
66100
// +kubebuilder:printcolumn:name="Service Name",type="string",JSONPath=".spec.serviceName"
67101
// +kubebuilder:printcolumn:name="Project",type="string",JSONPath=".spec.project"
68102
// +kubebuilder:printcolumn:name="Subject",type="string",JSONPath=".spec.subjectName"
69103
// +kubebuilder:printcolumn:name="Compatibility Level",type="string",JSONPath=".spec.compatibilityLevel"
70104
// +kubebuilder:printcolumn:name="Version",type="number",JSONPath=".status.version"
105+
// +kubebuilder:validation:XValidation:rule="!has(self.spec.references) || self.spec.references.all(r, !has(r.kafkaSchemaRef) || r.kafkaSchemaRef.name != self.metadata.name)",message="kafkaSchemaRef cannot point to the KafkaSchema itself"
71106
type KafkaSchema struct {
72107
metav1.TypeMeta `json:",inline"`
73108
metav1.ObjectMeta `json:"metadata,omitempty"`
@@ -94,6 +129,25 @@ func (in *KafkaSchema) GetObjectMeta() *metav1.ObjectMeta {
94129
return &in.ObjectMeta
95130
}
96131

132+
// GetRefs returns ResourceReferenceObjects for any kafkaSchemaRef entries in Spec.References.
133+
// The namespace is always the owner's namespace; refs are same-namespace only by design.
134+
func (in *KafkaSchema) GetRefs() []*ResourceReferenceObject {
135+
refs := make([]*ResourceReferenceObject, 0, len(in.Spec.References))
136+
for _, ref := range in.Spec.References {
137+
if ref.KafkaSchemaRef == nil {
138+
continue
139+
}
140+
refs = append(refs, &ResourceReferenceObject{
141+
GroupVersionKind: GroupVersion.WithKind("KafkaSchema"),
142+
NamespacedName: types.NamespacedName{
143+
Namespace: in.GetNamespace(),
144+
Name: ref.KafkaSchemaRef.Name,
145+
},
146+
})
147+
}
148+
return refs
149+
}
150+
97151
// +kubebuilder:object:root=true
98152

99153
// KafkaSchemaList contains a list of KafkaSchema

api/v1alpha1/zz_generated.deepcopy.go

Lines changed: 23 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

charts/aiven-operator-crds/templates/aiven.io_kafkaschemas.yaml

Lines changed: 58 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,15 @@ spec:
3333
name: v1alpha1
3434
schema:
3535
openAPIV3Schema:
36-
description: KafkaSchema is the Schema for the kafkaschemas API
36+
description: |-
37+
KafkaSchema is the Schema for the kafkaschemas API.
38+
39+
Self-references (A -> A) are blocked at admission; transitive cycles
40+
(A -> B -> A) are not detected at admission time.
41+
42+
Deletion: the operator performs a soft delete followed by a hard delete on
43+
the subject. The subject disappears from the registry's listing, re-applying a KafkaSchema with the same subjectName
44+
after deletion starts a brand-new subject at version 1.
3745
properties:
3846
apiVersion:
3947
description: |-
@@ -88,38 +96,70 @@ spec:
8896
- message: Value is immutable
8997
rule: self == oldSelf
9098
references:
91-
description:
92-
Schema references for Protobuf or JSON schemas that import
93-
other schemas
99+
description: |-
100+
Schema references for Protobuf or JSON schemas that import other schemas.
101+
References must form a directed acyclic graph (DAG); cycles are not allowed.
94102
items:
95-
description:
96-
SchemaReference is a reference to another schema in
97-
the registry
103+
description: |-
104+
SchemaReference is a reference to another schema in the registry.
105+
Exactly one of {subject+version} or kafkaSchemaRef must be set.
98106
properties:
107+
kafkaSchemaRef:
108+
description: |-
109+
Reference to another KafkaSchema resource in the same namespace.
110+
Mutually exclusive with subject/version.
111+
112+
Cleanup order matters: delete the dependent before the referent.
113+
properties:
114+
name:
115+
description:
116+
Name of the KafkaSchema resource in the same
117+
namespace.
118+
maxLength: 253
119+
minLength: 1
120+
type: string
121+
required:
122+
- name
123+
type: object
99124
name:
100125
description:
101126
Name used to reference the schema (e.g., the import
102127
path in Protobuf)
128+
maxLength: 512
103129
minLength: 1
104130
type: string
105131
subject:
106-
description: Subject name of the referenced schema in the registry
132+
description:
133+
Subject name of the referenced schema in the registry.
134+
Mutually exclusive with kafkaSchemaRef.
135+
maxLength: 512
107136
minLength: 1
108137
type: string
109138
version:
110-
description: Version of the referenced schema
139+
description:
140+
Version of the referenced schema. Mutually exclusive
141+
with kafkaSchemaRef.
111142
minimum: 1
112143
type: integer
113144
required:
114145
- name
115-
- subject
116-
- version
117146
type: object
147+
x-kubernetes-validations:
148+
- message:
149+
set both subject and version, or set kafkaSchemaRef,
150+
but not both
151+
rule:
152+
(has(self.subject) && has(self.version) && !has(self.kafkaSchemaRef))
153+
|| (!has(self.subject) && !has(self.version) && has(self.kafkaSchemaRef))
154+
maxItems: 100
118155
type: array
156+
x-kubernetes-list-map-keys:
157+
- name
158+
x-kubernetes-list-type: map
119159
schema:
120160
description:
121-
Kafka Schema configuration should be a valid Avro Schema
122-
JSON format
161+
Kafka Schema definition. Format depends on schemaType
162+
(AVRO/JSON/PROTOBUF)
123163
type: string
124164
schemaType:
125165
description: Schema type
@@ -235,6 +275,11 @@ spec:
235275
- version
236276
type: object
237277
type: object
278+
x-kubernetes-validations:
279+
- message: kafkaSchemaRef cannot point to the KafkaSchema itself
280+
rule:
281+
"!has(self.spec.references) || self.spec.references.all(r, !has(r.kafkaSchemaRef)
282+
|| r.kafkaSchemaRef.name != self.metadata.name)"
238283
served: true
239284
storage: true
240285
subresources:

0 commit comments

Comments
 (0)