Skip to content

Commit 96d522e

Browse files
freeznetCopilot
andauthored
feat: Enhance PulsarNamespace with schema management capabilities (#323)
* feat: Enhance PulsarNamespace with schema management capabilities - Added SchemaCompatibilityStrategy and SchemaValidationEnforced fields to PulsarNamespaceSpec for better schema evolution control. - Updated CRD and documentation to reflect new schema management options. - Modified reconciliation logic to handle schema validation enforcement. - Adjusted deep copy methods to accommodate new fields. * fix update policy * Update pkg/admin/impl.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * ping to fix pr * go mod tidy * revert * fix dep --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 21f350e commit 96d522e

10 files changed

Lines changed: 1105 additions & 333 deletions

File tree

api/v1alpha1/pulsarnamespace_types.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"k8s.io/apimachinery/pkg/api/resource"
2020
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2121

22+
adminutils "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
2223
"github.com/streamnative/pulsar-resources-operator/pkg/utils"
2324
)
2425

@@ -64,6 +65,19 @@ type PulsarNamespaceSpec struct {
6465
// +optional
6566
LifecyclePolicy PulsarResourceLifeCyclePolicy `json:"lifecyclePolicy,omitempty"`
6667

68+
// SchemaCompatibilityStrategy defines the schema compatibility strategy for this namespace.
69+
// If not specified, the cluster's default schema compatibility strategy will be used.
70+
// This setting controls how schema evolution is handled for topics within this namespace.
71+
// +optional
72+
// +kubebuilder:validation:Enum=AutoUpdateDisabled;Backward;Forward;Full;AlwaysCompatible;BackwardTransitive;ForwardTransitive;FullTransitive
73+
SchemaCompatibilityStrategy *adminutils.SchemaCompatibilityStrategy `json:"schemaCompatibilityStrategy,omitempty"`
74+
75+
// SchemaValidationEnforced controls whether schema validation is enforced for this namespace.
76+
// When enabled, producers must provide a schema when publishing messages.
77+
// If not specified, the cluster's default schema validation enforcement setting will be used.
78+
// +optional
79+
SchemaValidationEnforced *bool `json:"schemaValidationEnforced,omitempty"`
80+
6781
// MaxProducersPerTopic sets the maximum number of producers allowed on a single topic in the namespace.
6882
// +optional
6983
MaxProducersPerTopic *int32 `json:"maxProducersPerTopic,omitempty"`

api/v1alpha1/zz_generated.deepcopy.go

Lines changed: 19 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

config/crd/bases/resource.streamnative.io_pulsarnamespaces.yaml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,27 @@ spec:
238238
Should be set in conjunction with RetentionSize for effective retention policy.
239239
Retention Quota must exceed configured backlog quota for namespace
240240
type: string
241+
schemaCompatibilityStrategy:
242+
description: |-
243+
SchemaCompatibilityStrategy defines the schema compatibility strategy for this namespace.
244+
If not specified, the cluster's default schema compatibility strategy will be used.
245+
This setting controls how schema evolution is handled for topics within this namespace.
246+
enum:
247+
- AutoUpdateDisabled
248+
- Backward
249+
- Forward
250+
- Full
251+
- AlwaysCompatible
252+
- BackwardTransitive
253+
- ForwardTransitive
254+
- FullTransitive
255+
type: string
256+
schemaValidationEnforced:
257+
description: |-
258+
SchemaValidationEnforced controls whether schema validation is enforced for this namespace.
259+
When enabled, producers must provide a schema when publishing messages.
260+
If not specified, the cluster's default schema validation enforcement setting will be used.
261+
type: boolean
241262
topicAutoCreationConfig:
242263
description: |-
243264
TopicAutoCreationConfig controls whether automatic topic creation is allowed in this namespace

docs/pulsar_namespace.md

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ The `PulsarNamespace` resource defines a namespace in a Pulsar cluster. It allow
2929
| `deduplication` | Whether to enable message deduplication for the namespace. | No |
3030
| `bookieAffinityGroup` | Set the bookie-affinity group for the namespace, which has two sub fields: `bookkeeperAffinityGroupPrimary(String)` is required, and `bookkeeperAffinityGroupSecondary(String)` is optional. | No |
3131
| `topicAutoCreationConfig` | Configures automatic topic creation behavior within this namespace. Contains settings for whether auto-creation is allowed, the type of topics created, and default number of partitions. | No |
32+
| `schemaCompatibilityStrategy` | Schema compatibility strategy for this namespace. Controls how schema evolution is handled for topics within this namespace. Options: `AutoUpdateDisabled`, `Backward`, `Forward`, `Full`, `AlwaysCompatible`, `BackwardTransitive`, `ForwardTransitive`, `FullTransitive`. | No |
33+
| `schemaValidationEnforced` | Controls whether schema validation is enforced for this namespace. When enabled, producers must provide a schema when publishing messages. If not specified, the cluster's default schema validation enforcement setting will be used. | No |
3234

3335
Note: Valid time units are "s" (seconds), "m" (minutes), "h" (hours), "d" (days), "w" (weeks).
3436

@@ -70,6 +72,89 @@ This configuration overrides the broker's default topic auto-creation settings f
7072
```
7173
This explicitly disables topic auto-creation for the namespace, overriding any broker-level settings that might enable it.
7274
75+
## Schema Management
76+
77+
Pulsar provides powerful schema management capabilities at the namespace level, allowing you to control how schema evolution is handled and whether schema validation is enforced. This feature consists of two complementary settings: schema compatibility strategy and schema validation enforcement.
78+
79+
### Schema Compatibility Strategy
80+
81+
The `schemaCompatibilityStrategy` field controls how Pulsar handles schema evolution for topics within the namespace. This allows you to configure different compatibility requirements for different namespaces based on your use case.
82+
83+
#### Available Strategies
84+
85+
1. **AutoUpdateDisabled**: Disables automatic schema updates and requires manual schema management. This is the most restrictive strategy, suitable for ultra-stable environments where strict schema control is required and no automatic schema evolution is desired.
86+
87+
2. **AlwaysCompatible**: Allows any schema changes without validation. This is the most permissive strategy but may lead to compatibility issues. Suitable for development/testing environments.
88+
89+
3. **Backward**: New schema can read data written with the previous schema. This strategy supports consumer-driven schema evolution, such as adding optional fields or removing fields.
90+
91+
4. **BackwardTransitive**: New schema can read data written with any previous schema in the chain. This provides long-term backward compatibility across multiple schema versions.
92+
93+
5. **Forward**: Previous schema can read data written with the new schema. This strategy supports producer-driven schema evolution, such as adding fields that older consumers can ignore.
94+
95+
6. **ForwardTransitive**: Any previous schema can read data written with the new schema. This ensures new data is readable by any older schema version.
96+
97+
7. **Full**: Schema changes are both forward and backward compatible. Both new and previous schemas can read data written by either schema. This provides strict compatibility requirements in both directions.
98+
99+
8. **FullTransitive**: Schema changes are forward and backward compatible with all schemas. Any schema in the chain can read data written by any other schema in the chain. This provides maximum compatibility guarantees.
100+
101+
#### Usage Examples
102+
103+
**Development Environment**:
104+
```yaml
105+
schemaCompatibilityStrategy: AlwaysCompatible
106+
```
107+
108+
**Production Environment**:
109+
```yaml
110+
schemaCompatibilityStrategy: Backward
111+
```
112+
113+
**Critical Systems**:
114+
```yaml
115+
schemaCompatibilityStrategy: FullTransitive
116+
```
117+
118+
### Schema Validation Enforcement
119+
120+
The `schemaValidationEnforced` field controls whether producers must provide a schema when publishing messages to topics within the namespace.
121+
122+
- **When enabled (`true`)**: Producers must provide a schema when publishing messages. Messages without schemas will be rejected. This ensures all data in the namespace has a defined structure and is recommended for production environments where data consistency is critical.
123+
124+
- **When disabled (`false`)**: Producers can publish messages with or without schemas. This allows for more flexibility in message publishing and is useful for development/testing environments or legacy integrations.
125+
126+
- **Default behavior**: If `schemaValidationEnforced` is not specified, the cluster's default schema validation enforcement setting will be used.
127+
128+
### Configuration Examples by Use Case
129+
130+
#### Development/Testing Environment
131+
```yaml
132+
schemaCompatibilityStrategy: AlwaysCompatible
133+
schemaValidationEnforced: false
134+
```
135+
This configuration allows rapid schema iteration and flexible schema validation for experimentation.
136+
137+
#### Standard Production Environment
138+
```yaml
139+
schemaCompatibilityStrategy: Backward
140+
schemaValidationEnforced: true
141+
```
142+
This provides a good balance between flexibility and safety, ensuring consumers can handle schema changes while enforcing schema validation for data consistency.
143+
144+
#### Mission-Critical Systems
145+
```yaml
146+
schemaCompatibilityStrategy: FullTransitive
147+
schemaValidationEnforced: true
148+
```
149+
This configuration provides maximum compatibility guarantees with strict schema validation enforcement.
150+
151+
#### Legacy System Integration
152+
```yaml
153+
schemaCompatibilityStrategy: ForwardTransitive
154+
schemaValidationEnforced: false
155+
```
156+
This ensures older systems can consume new data while allowing gradual migration without strict schema requirements.
157+
73158
## replicationClusters vs geoReplicationRefs
74159

75160
The `replicationClusters` and `geoReplicationRefs` fields serve different purposes in configuring replication for a Pulsar namespace:
@@ -105,6 +190,9 @@ spec:
105190
backlogQuotaLimitTime: 24h
106191
bundles: 16
107192
messageTTL: 1h
193+
# Schema management configuration
194+
# schemaCompatibilityStrategy: Backward
195+
# schemaValidationEnforced: true
108196
# backlogQuotaRetentionPolicy: producer_request_hold
109197
# maxProducersPerTopic: 2
110198
# maxConsumersPerTopic: 2
@@ -144,7 +232,7 @@ Please note the following important points:
144232

145233
1. The fields `name` and `bundles` cannot be updated after the namespace is created. These are immutable properties of the namespace.
146234

147-
2. Other fields such as `backlogQuotaLimitSize`, `backlogQuotaLimitTime`, `messageTTL`, `maxProducersPerTopic`, `maxConsumersPerTopic`, `maxConsumersPerSubscription`, `retentionTime`, `retentionSize`, and `topicAutoCreationConfig` can be modified.
235+
2. Other fields such as `backlogQuotaLimitSize`, `backlogQuotaLimitTime`, `messageTTL`, `maxProducersPerTopic`, `maxConsumersPerTopic`, `maxConsumersPerSubscription`, `retentionTime`, `retentionSize`, `topicAutoCreationConfig`, `schemaCompatibilityStrategy`, and `schemaValidationEnforced` can be modified.
148236

149237
3. If you want to change the `connectionRef`, ensure that the new PulsarConnection resource exists and is properly configured. Changing the `connectionRef` can have significant implications:
150238

0 commit comments

Comments
 (0)