diff --git a/api/v1alpha1/apikey_types.go b/api/v1alpha1/apikey_types.go index a0e975e2..9ceba0d2 100644 --- a/api/v1alpha1/apikey_types.go +++ b/api/v1alpha1/apikey_types.go @@ -44,7 +44,7 @@ type APIKeySpec struct { // Revoke indicates whether this API key should be revoked // +optional - Revoke bool `json:"revoke,omitempty"` + Revoke bool `json:"revoke"` // EncryptionKey contains the public key used to encrypt the token // +optional @@ -52,7 +52,7 @@ type APIKeySpec struct { // ExportPlaintextToken indicates whether the token should be exported in plaintext // +optional - ExportPlaintextToken *bool `json:"exportPlaintextToken,omitempty"` + ExportPlaintextToken *bool `json:"exportPlaintextToken"` } // EncryptionKey contains a public key used for encryption diff --git a/api/v1alpha1/common.go b/api/v1alpha1/common.go index 06946c4d..0aac2634 100644 --- a/api/v1alpha1/common.go +++ b/api/v1alpha1/common.go @@ -170,7 +170,7 @@ type ProducerConfig struct { MaxPendingMessagesAcrossPartitions int `json:"maxPendingMessagesAcrossPartitions,omitempty" yaml:"maxPendingMessagesAcrossPartitions"` // +optional - UseThreadLocalProducers bool `json:"useThreadLocalProducers,omitempty" yaml:"useThreadLocalProducers"` + UseThreadLocalProducers bool `json:"useThreadLocalProducers" yaml:"useThreadLocalProducers"` // +optional CryptoConfig *CryptoConfig `json:"cryptoConfig,omitempty" yaml:"cryptoConfig"` @@ -191,7 +191,7 @@ type ConsumerConfig struct { SerdeClassName string `json:"serdeClassName,omitempty" yaml:"serdeClassName"` // +optional - RegexPattern bool `json:"regexPattern,omitempty" yaml:"regexPattern"` + RegexPattern bool `json:"regexPattern" yaml:"regexPattern"` // +optional ReceiverQueueSize int `json:"receiverQueueSize,omitempty" yaml:"receiverQueueSize"` @@ -206,7 +206,7 @@ type ConsumerConfig struct { CryptoConfig *CryptoConfig `json:"cryptoConfig,omitempty" yaml:"cryptoConfig"` // +optional - PoolMessages bool `json:"poolMessages,omitempty" yaml:"poolMessages"` + PoolMessages bool `json:"poolMessages" yaml:"poolMessages"` } // CryptoConfig represents the configuration for the crypto of the pulsar functions and connectors diff --git a/api/v1alpha1/computeflinkdeployment_types.go b/api/v1alpha1/computeflinkdeployment_types.go index e29c94a2..2e1e63fe 100644 --- a/api/v1alpha1/computeflinkdeployment_types.go +++ b/api/v1alpha1/computeflinkdeployment_types.go @@ -378,7 +378,7 @@ type ComputeFlinkDeploymentList struct { // VvpRestoreStrategy defines the restore strategy of the deployment type VvpRestoreStrategy struct { - AllowNonRestoredState bool `json:"allowNonRestoredState,omitempty"` + AllowNonRestoredState bool `json:"allowNonRestoredState"` Kind string `json:"kind,omitempty"` } diff --git a/api/v1alpha1/computeworkspace_types.go b/api/v1alpha1/computeworkspace_types.go index d32e02f7..c874f682 100644 --- a/api/v1alpha1/computeworkspace_types.go +++ b/api/v1alpha1/computeworkspace_types.go @@ -38,7 +38,7 @@ type ComputeWorkspaceSpec struct { // UseExternalAccess is the flag to indicate whether the workspace will use external access. // +optional - UseExternalAccess *bool `json:"useExternalAccess,omitempty"` + UseExternalAccess *bool `json:"useExternalAccess"` // FlinkBlobStorage is the configuration for the Flink blob storage. // +optional diff --git a/api/v1alpha1/pulsarconnection_types.go b/api/v1alpha1/pulsarconnection_types.go index b58e4653..1da4d848 100644 --- a/api/v1alpha1/pulsarconnection_types.go +++ b/api/v1alpha1/pulsarconnection_types.go @@ -78,11 +78,11 @@ type PulsarConnectionSpec struct { // TLSEnableHostnameVerification indicates whether to verify the hostname of the broker. // Only used when using secure urls. // +optional - TLSEnableHostnameVerification bool `json:"tlsEnableHostnameVerification,omitempty"` + TLSEnableHostnameVerification bool `json:"tlsEnableHostnameVerification"` // TLSAllowInsecureConnection indicates whether to allow insecure connection to the broker. // +optional - TLSAllowInsecureConnection bool `json:"tlsAllowInsecureConnection,omitempty"` + TLSAllowInsecureConnection bool `json:"tlsAllowInsecureConnection"` // TLSTrustCertsFilePath Path for the TLS certificate used to validate the broker endpoint when using TLS. // +optional diff --git a/api/v1alpha1/pulsarfunction_types.go b/api/v1alpha1/pulsarfunction_types.go index b5027d67..f5ca2b21 100644 --- a/api/v1alpha1/pulsarfunction_types.go +++ b/api/v1alpha1/pulsarfunction_types.go @@ -38,15 +38,15 @@ type PulsarFunctionSpec struct { // CleanupSubscription is the flag to indicate whether the subscription should be cleaned up when the function is deleted // +optional - CleanupSubscription *bool `json:"cleanupSubscription,omitempty"` + CleanupSubscription *bool `json:"cleanupSubscription"` // RetainOrdering is the flag to indicate whether the function should retain ordering // +optional - RetainOrdering *bool `json:"retainOrdering,omitempty"` + RetainOrdering *bool `json:"retainOrdering"` // RetainKeyOrdering is the flag to indicate whether the function should retain key ordering // +optional - RetainKeyOrdering *bool `json:"retainKeyOrdering,omitempty"` + RetainKeyOrdering *bool `json:"retainKeyOrdering"` // BatchBuilder is the batch builder that the function uses // +optional @@ -54,11 +54,11 @@ type PulsarFunctionSpec struct { // ForwardSourceMessageProperty is the flag to indicate whether the function should forward source message properties // +optional - ForwardSourceMessageProperty *bool `json:"forwardSourceMessageProperty,omitempty"` + ForwardSourceMessageProperty *bool `json:"forwardSourceMessageProperty"` // AutoAck is the flag to indicate whether the function should auto ack // +optional - AutoAck *bool `json:"autoAck,omitempty"` + AutoAck *bool `json:"autoAck"` // Parallelism is the parallelism of the function // +optional @@ -186,11 +186,11 @@ type PulsarFunctionSpec struct { // ExposePulsarAdminClientEnabled is the flag to indicate whether the function should expose pulsar admin client // +optional - ExposePulsarAdminClientEnabled *bool `json:"exposePulsarAdminClientEnabled,omitempty"` + ExposePulsarAdminClientEnabled *bool `json:"exposePulsarAdminClientEnabled"` // SkipToLatest is the flag to indicate whether the function should skip to latest // +optional - SkipToLatest *bool `json:"skipToLatest,omitempty"` + SkipToLatest *bool `json:"skipToLatest"` // SubscriptionPosition is the subscription position of the function // +optional diff --git a/api/v1alpha1/pulsarnamespace_types.go b/api/v1alpha1/pulsarnamespace_types.go index b4a365f8..d30bab8f 100644 --- a/api/v1alpha1/pulsarnamespace_types.go +++ b/api/v1alpha1/pulsarnamespace_types.go @@ -26,7 +26,7 @@ import ( // TopicAutoCreationConfig defines the configuration for automatic topic creation type TopicAutoCreationConfig struct { // Allow specifies whether to allow automatic topic creation - Allow bool `json:"allow,omitempty"` + Allow bool `json:"allow"` // Type specifies the type of automatically created topics // +kubebuilder:validation:Enum=partitioned;non-partitioned @@ -125,7 +125,7 @@ type InactiveTopicPolicies struct { // DeleteWhileInactive specifies whether to delete topics while they are inactive // +optional - DeleteWhileInactive *bool `json:"deleteWhileInactive,omitempty"` + DeleteWhileInactive *bool `json:"deleteWhileInactive"` } // EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! @@ -167,7 +167,7 @@ type PulsarNamespaceSpec struct { // When enabled, producers must provide a schema when publishing messages. // If not specified, the cluster's default schema validation enforcement setting will be used. // +optional - SchemaValidationEnforced *bool `json:"schemaValidationEnforced,omitempty"` + SchemaValidationEnforced *bool `json:"schemaValidationEnforced"` // MaxProducersPerTopic sets the maximum number of producers allowed on a single topic in the namespace. // +optional @@ -246,7 +246,7 @@ type PulsarNamespaceSpec struct { // Deduplication controls whether to enable message deduplication for the namespace. // +optional - Deduplication *bool `json:"deduplication,omitempty"` + Deduplication *bool `json:"deduplication"` // BookieAffinityGroup is the name of the namespace isolation policy to apply to the namespace. BookieAffinityGroup *BookieAffinityGroupData `json:"bookieAffinityGroup,omitempty"` @@ -309,17 +309,17 @@ type PulsarNamespaceSpec struct { // IsAllowAutoUpdateSchema specifies whether to allow automatic schema updates. // When enabled, producers can automatically update schemas without manual approval. // +optional - IsAllowAutoUpdateSchema *bool `json:"isAllowAutoUpdateSchema,omitempty"` + IsAllowAutoUpdateSchema *bool `json:"isAllowAutoUpdateSchema"` // ValidateProducerName specifies whether to validate producer names. // When enabled, producer names must follow specific naming conventions. // +optional - ValidateProducerName *bool `json:"validateProducerName,omitempty"` + ValidateProducerName *bool `json:"validateProducerName"` // EncryptionRequired specifies whether message encryption is required for this namespace. // When enabled, all messages published to topics in this namespace must be encrypted. // +optional - EncryptionRequired *bool `json:"encryptionRequired,omitempty"` + EncryptionRequired *bool `json:"encryptionRequired"` // SubscriptionAuthMode specifies the subscription authentication mode for this namespace. // Valid values are "None" and "Prefix". @@ -370,7 +370,7 @@ type PulsarNamespaceStatus struct { // GeoReplicationEnabled indicates whether geo-replication between two Pulsar instances (via PulsarGeoReplication) // is enabled for the namespace // +optional - GeoReplicationEnabled bool `json:"geoReplicationEnabled,omitempty"` + GeoReplicationEnabled bool `json:"geoReplicationEnabled"` } //+kubebuilder:object:root=true diff --git a/api/v1alpha1/pulsarsink_types.go b/api/v1alpha1/pulsarsink_types.go index 47e48afe..3ba0d9db 100644 --- a/api/v1alpha1/pulsarsink_types.go +++ b/api/v1alpha1/pulsarsink_types.go @@ -42,19 +42,19 @@ type PulsarSinkSpec struct { // CleanupSubscription is the flag to enable or disable the cleanup of subscription // +optional - CleanupSubscription *bool `json:"cleanupSubscription,omitempty"` + CleanupSubscription *bool `json:"cleanupSubscription"` // RetainOrdering is the flag to enable or disable the retain ordering // +optional - RetainOrdering *bool `json:"retainOrdering,omitempty"` + RetainOrdering *bool `json:"retainOrdering"` // RetainKeyOrdering is the flag to enable or disable the retain key ordering // +optional - RetainKeyOrdering *bool `json:"retainKeyOrdering,omitempty"` + RetainKeyOrdering *bool `json:"retainKeyOrdering"` // AutoAck is the flag to enable or disable the auto ack // +optional - AutoAck *bool `json:"autoAck,omitempty"` + AutoAck *bool `json:"autoAck"` // Parallelism is the parallelism of the PulsarSink // +optional diff --git a/api/v1alpha1/pulsartopic_types.go b/api/v1alpha1/pulsartopic_types.go index 5c9b1848..e034561d 100644 --- a/api/v1alpha1/pulsartopic_types.go +++ b/api/v1alpha1/pulsartopic_types.go @@ -40,7 +40,7 @@ type PulsarTopicSpec struct { // Defaults to true if not specified. // +kubebuilder:default=true // +optional - Persistent *bool `json:"persistent,omitempty"` + Persistent *bool `json:"persistent"` // Partitions specifies the number of partitions for a partitioned topic. // Set to 0 for a non-partitioned topic. @@ -141,7 +141,7 @@ type PulsarTopicSpec struct { // Deduplication controls whether to enable message deduplication for the topic. // +optional - Deduplication *bool `json:"deduplication,omitempty"` + Deduplication *bool `json:"deduplication"` // CompactionThreshold specifies the size threshold in bytes for automatic topic compaction. // When the topic reaches this size, compaction will be triggered automatically. @@ -194,7 +194,7 @@ type PulsarTopicSpec struct { // SchemaValidationEnforced determines whether schema validation is enforced for the topic. // When enabled, only messages that conform to the topic's schema will be accepted. // +optional - SchemaValidationEnforced *bool `json:"schemaValidationEnforced,omitempty"` + SchemaValidationEnforced *bool `json:"schemaValidationEnforced"` // SubscriptionDispatchRate defines the message dispatch rate limiting policy for subscriptions. // This controls the rate at which messages are delivered to consumers per subscription. @@ -237,7 +237,7 @@ type PulsarTopicSpec struct { type DelayedDeliveryData struct { // Active determines whether delayed delivery is enabled for the topic // +optional - Active *bool `json:"active,omitempty"` + Active *bool `json:"active"` // TickTimeMillis specifies the tick time for delayed message delivery in milliseconds // +optional @@ -288,7 +288,7 @@ type OffloadPolicies struct { // This is a local type definition that mirrors the external library's AutoSubscriptionCreationOverride // to ensure proper Kubernetes deep copy generation. type AutoSubscriptionCreationOverride struct { - AllowAutoSubscriptionCreation bool `json:"allowAutoSubscriptionCreation,omitempty"` + AllowAutoSubscriptionCreation bool `json:"allowAutoSubscriptionCreation"` } // SchemaCompatibilityStrategy defines the schema compatibility strategy for a topic. @@ -321,7 +321,7 @@ type PulsarTopicStatus struct { // GeoReplicationEnabled indicates whether geo-replication is enabled for this topic. // This is set to true when GeoReplicationRefs are configured in the spec and successfully applied. // +optional - GeoReplicationEnabled bool `json:"geoReplicationEnabled,omitempty"` + GeoReplicationEnabled bool `json:"geoReplicationEnabled"` } //+kubebuilder:object:root=true diff --git a/controllers/pulsartopic_boolean_persistence_test.go b/controllers/pulsartopic_boolean_persistence_test.go new file mode 100644 index 00000000..8a5f4be5 --- /dev/null +++ b/controllers/pulsartopic_boolean_persistence_test.go @@ -0,0 +1,84 @@ +// Copyright 2026 StreamNative +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controllers + +import ( + "context" + "encoding/json" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + + resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1" +) + +var _ = Describe("PulsarTopic Boolean Field Persistence", func() { + Context("when PulsarTopic status has GeoReplicationEnabled set to false", func() { + It("should preserve false value in JSON serialization", func() { + ctx := context.Background() + namespace := "default" + topicName := "test-geo-replication-false" + connectionName := "test-connection" + + connection := &resourcev1alpha1.PulsarConnection{ + ObjectMeta: metav1.ObjectMeta{ + Name: connectionName, + Namespace: namespace, + }, + Spec: resourcev1alpha1.PulsarConnectionSpec{ + AdminServiceURL: "http://localhost:8080", + BrokerServiceURL: "pulsar://localhost:6650", + }, + } + Expect(k8sClient.Create(ctx, connection)).Should(Succeed()) + + topic := &resourcev1alpha1.PulsarTopic{ + ObjectMeta: metav1.ObjectMeta{ + Name: topicName, + Namespace: namespace, + }, + Spec: resourcev1alpha1.PulsarTopicSpec{ + Name: "persistent://public/default/" + topicName, + ConnectionRef: corev1.LocalObjectReference{ + Name: connectionName, + }, + }, + Status: resourcev1alpha1.PulsarTopicStatus{ + GeoReplicationEnabled: false, + }, + } + Expect(k8sClient.Create(ctx, topic)).Should(Succeed()) + + // Get the created topic and verify status + createdTopic := &resourcev1alpha1.PulsarTopic{} + topicKey := types.NamespacedName{Name: topicName, Namespace: namespace} + Expect(k8sClient.Get(ctx, topicKey, createdTopic)).Should(Succeed()) + Expect(createdTopic.Status.GeoReplicationEnabled).Should(Equal(false)) + + // Verify the serialization + statusJSON, err := json.Marshal(createdTopic.Status) + Expect(err).Should(Succeed()) + var statusMap map[string]interface{} + Expect(json.Unmarshal(statusJSON, &statusMap)).Should(Succeed()) + Expect(statusMap).Should(HaveKey("geoReplicationEnabled")) + geoRepValue, exists := statusMap["geoReplicationEnabled"] + Expect(exists).Should(BeTrue()) + Expect(geoRepValue).Should(Equal(false)) + }) + }) +})