Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/v1alpha1/apikey_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type APIKeySpec struct {

// Revoke indicates whether this API key should be revoked
// +optional
Revoke bool `json:"revoke,omitempty"`
Revoke bool `json:"revoke"`

// EncryptionKey contains the public key used to encrypt the token
// +optional
Expand Down
6 changes: 3 additions & 3 deletions api/v1alpha1/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ type ProducerConfig struct {
MaxPendingMessagesAcrossPartitions int `json:"maxPendingMessagesAcrossPartitions,omitempty" yaml:"maxPendingMessagesAcrossPartitions"`

// +optional
UseThreadLocalProducers bool `json:"useThreadLocalProducers,omitempty" yaml:"useThreadLocalProducers"`
UseThreadLocalProducers bool `json:"useThreadLocalProducers" yaml:"useThreadLocalProducers"`

// +optional
CryptoConfig *CryptoConfig `json:"cryptoConfig,omitempty" yaml:"cryptoConfig"`
Expand All @@ -191,7 +191,7 @@ type ConsumerConfig struct {
SerdeClassName string `json:"serdeClassName,omitempty" yaml:"serdeClassName"`

// +optional
RegexPattern bool `json:"regexPattern,omitempty" yaml:"regexPattern"`
RegexPattern bool `json:"regexPattern" yaml:"regexPattern"`

// +optional
ReceiverQueueSize int `json:"receiverQueueSize,omitempty" yaml:"receiverQueueSize"`
Expand All @@ -206,7 +206,7 @@ type ConsumerConfig struct {
CryptoConfig *CryptoConfig `json:"cryptoConfig,omitempty" yaml:"cryptoConfig"`

// +optional
PoolMessages bool `json:"poolMessages,omitempty" yaml:"poolMessages"`
PoolMessages bool `json:"poolMessages" yaml:"poolMessages"`
}

// CryptoConfig represents the configuration for the crypto of the pulsar functions and connectors
Expand Down
2 changes: 1 addition & 1 deletion api/v1alpha1/computeflinkdeployment_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ type ComputeFlinkDeploymentList struct {

// VvpRestoreStrategy defines the restore strategy of the deployment
type VvpRestoreStrategy struct {
AllowNonRestoredState bool `json:"allowNonRestoredState,omitempty"`
AllowNonRestoredState bool `json:"allowNonRestoredState"`
Kind string `json:"kind,omitempty"`
}

Expand Down
4 changes: 2 additions & 2 deletions api/v1alpha1/pulsarconnection_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ type PulsarConnectionSpec struct {
// TLSEnableHostnameVerification indicates whether to verify the hostname of the broker.
// Only used when using secure urls.
// +optional
TLSEnableHostnameVerification bool `json:"tlsEnableHostnameVerification,omitempty"`
TLSEnableHostnameVerification bool `json:"tlsEnableHostnameVerification"`

// TLSAllowInsecureConnection indicates whether to allow insecure connection to the broker.
// +optional
TLSAllowInsecureConnection bool `json:"tlsAllowInsecureConnection,omitempty"`
TLSAllowInsecureConnection bool `json:"tlsAllowInsecureConnection"`

// TLSTrustCertsFilePath Path for the TLS certificate used to validate the broker endpoint when using TLS.
// +optional
Expand Down
4 changes: 2 additions & 2 deletions api/v1alpha1/pulsarnamespace_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
// TopicAutoCreationConfig defines the configuration for automatic topic creation
type TopicAutoCreationConfig struct {
// Allow specifies whether to allow automatic topic creation
Allow bool `json:"allow,omitempty"`
Allow bool `json:"allow"`

// Type specifies the type of automatically created topics
// +kubebuilder:validation:Enum=partitioned;non-partitioned
Expand Down Expand Up @@ -375,7 +375,7 @@ type PulsarNamespaceStatus struct {
// GeoReplicationEnabled indicates whether geo-replication between two Pulsar instances (via PulsarGeoReplication)
// is enabled for the namespace
// +optional
GeoReplicationEnabled bool `json:"geoReplicationEnabled,omitempty"`
GeoReplicationEnabled bool `json:"geoReplicationEnabled"`
}

//+kubebuilder:object:root=true
Expand Down
4 changes: 2 additions & 2 deletions api/v1alpha1/pulsartopic_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ type OffloadPolicies struct {
// This is a local type definition that mirrors the external library's AutoSubscriptionCreationOverride
// to ensure proper Kubernetes deep copy generation.
type AutoSubscriptionCreationOverride struct {
AllowAutoSubscriptionCreation bool `json:"allowAutoSubscriptionCreation,omitempty"`
AllowAutoSubscriptionCreation bool `json:"allowAutoSubscriptionCreation"`
}

// SchemaCompatibilityStrategy defines the schema compatibility strategy for a topic.
Expand Down Expand Up @@ -340,7 +340,7 @@ type PulsarTopicStatus struct {
// GeoReplicationEnabled indicates whether geo-replication is enabled for this topic.
// This is set to true when GeoReplicationRefs are configured in the spec and successfully applied.
// +optional
GeoReplicationEnabled bool `json:"geoReplicationEnabled,omitempty"`
GeoReplicationEnabled bool `json:"geoReplicationEnabled"`
}

//+kubebuilder:object:root=true
Expand Down
84 changes: 84 additions & 0 deletions controllers/pulsartopic_boolean_persistence_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2026 StreamNative
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package controllers

import (
"context"
"encoding/json"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1"
)

var _ = Describe("PulsarTopic Boolean Field Persistence", func() {
Context("when PulsarTopic status has GeoReplicationEnabled set to false", func() {
It("should preserve false value in JSON serialization", func() {
ctx := context.Background()
Comment on lines +30 to +33
namespace := "default"
topicName := "test-geo-replication-false"
connectionName := "test-connection"

connection := &resourcev1alpha1.PulsarConnection{
ObjectMeta: metav1.ObjectMeta{
Name: connectionName,
Namespace: namespace,
},
Spec: resourcev1alpha1.PulsarConnectionSpec{
AdminServiceURL: "http://localhost:8080",
BrokerServiceURL: "pulsar://localhost:6650",
},
}
Expect(k8sClient.Create(ctx, connection)).Should(Succeed())

topic := &resourcev1alpha1.PulsarTopic{
ObjectMeta: metav1.ObjectMeta{
Name: topicName,
Namespace: namespace,
},
Spec: resourcev1alpha1.PulsarTopicSpec{
Name: "persistent://public/default/" + topicName,
ConnectionRef: corev1.LocalObjectReference{
Name: connectionName,
},
},
Status: resourcev1alpha1.PulsarTopicStatus{
GeoReplicationEnabled: false,
},
}
Expect(k8sClient.Create(ctx, topic)).Should(Succeed())

// Get the created topic and verify status
createdTopic := &resourcev1alpha1.PulsarTopic{}
topicKey := types.NamespacedName{Name: topicName, Namespace: namespace}
Expect(k8sClient.Get(ctx, topicKey, createdTopic)).Should(Succeed())
Expect(createdTopic.Status.GeoReplicationEnabled).Should(Equal(false))

// Verify the serialization
statusJSON, err := json.Marshal(createdTopic.Status)
Expect(err).Should(Succeed())
var statusMap map[string]interface{}
Expect(json.Unmarshal(statusJSON, &statusMap)).Should(Succeed())
Expect(statusMap).Should(HaveKey("geoReplicationEnabled"))
geoRepValue, exists := statusMap["geoReplicationEnabled"]
Expect(exists).Should(BeTrue())
Expect(geoRepValue).Should(Equal(false))
})
})
})
2 changes: 1 addition & 1 deletion pkg/reconciler/stateful_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type StateChangeOperations struct {

// ContextChanged indicates that the context (like resource type or name) has changed
// and requires special handling like cleanup of old context
ContextChanged bool `json:"contextChanged,omitempty"`
ContextChanged bool `json:"contextChanged"`

// PreviousContext contains the previous context information for cleanup
PreviousContext interface{} `json:"previousContext,omitempty"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type APIKeySpec struct {
// Revoke is a boolean that defines if the token of this API key should be revoked.
// +kubebuilder:default=false
// +optional
Revoke bool `json:"revoke,omitempty" protobuf:"bytes,5,opt,name=revoke"`
Revoke bool `json:"revoke" protobuf:"bytes,5,opt,name=revoke"`

// EncryptionKey is a public key to encrypt the API Key token.
// Please provide an RSA key with modulus length of at least 2048 bits.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ type VvpDeploymentDetails struct {

// VvpRestoreStrategy defines the restore strategy of the deployment
type VvpRestoreStrategy struct {
AllowNonRestoredState bool `json:"allowNonRestoredState,omitempty" protobuf:"varint,1,opt,name=allowNonRestoredState"`
AllowNonRestoredState bool `json:"allowNonRestoredState" protobuf:"varint,1,opt,name=allowNonRestoredState"`

Kind string `json:"kind,omitempty" protobuf:"bytes,2,opt,name=kind"`
}
Expand Down