diff --git a/api/v1alpha1/pulsarnamespace_types.go b/api/v1alpha1/pulsarnamespace_types.go index 2ddd8196..b4a365f8 100644 --- a/api/v1alpha1/pulsarnamespace_types.go +++ b/api/v1alpha1/pulsarnamespace_types.go @@ -209,6 +209,7 @@ type PulsarNamespaceSpec struct { // BacklogQuotaRetentionPolicy specifies the retention policy for messages when backlog quota is exceeded. // Valid values are "producer_request_hold", "producer_exception", or "consumer_backlog_eviction". + // +kubebuilder:validation:Enum=producer_request_hold;producer_exception;consumer_backlog_eviction // +optional BacklogQuotaRetentionPolicy *string `json:"backlogQuotaRetentionPolicy,omitempty"` diff --git a/api/v1alpha1/pulsartopic_types.go b/api/v1alpha1/pulsartopic_types.go index e14e7ddb..5c9b1848 100644 --- a/api/v1alpha1/pulsartopic_types.go +++ b/api/v1alpha1/pulsartopic_types.go @@ -111,9 +111,16 @@ type PulsarTopicSpec struct { // BacklogQuotaRetentionPolicy specifies the retention policy for messages when backlog quota is exceeded. // Valid values are "producer_request_hold", "producer_exception", or "consumer_backlog_eviction". + // +kubebuilder:validation:Enum=producer_request_hold;producer_exception;consumer_backlog_eviction // +optional BacklogQuotaRetentionPolicy *string `json:"backlogQuotaRetentionPolicy,omitempty"` + // BacklogQuotaType controls how the backlog quota is enforced. + // "destination_storage" limits backlog by size (in bytes), while "message_age" limits by time. + // +kubebuilder:validation:Enum=destination_storage;message_age + // +optional + BacklogQuotaType *string `json:"backlogQuotaType,omitempty"` + // SchemaInfo defines the schema for the topic, if any. // +optional SchemaInfo *SchemaInfo `json:"schemaInfo,omitempty"` diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 42fa91ab..8db33293 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -3008,6 +3008,11 @@ func (in *PulsarTopicSpec) DeepCopyInto(out *PulsarTopicSpec) { *out = new(string) **out = **in } + if in.BacklogQuotaType != nil { + in, out := &in.BacklogQuotaType, &out.BacklogQuotaType + *out = new(string) + **out = **in + } if in.SchemaInfo != nil { in, out := &in.SchemaInfo, &out.SchemaInfo *out = new(SchemaInfo) diff --git a/charts/pulsar-resources-operator/crds/resource.streamnative.io_pulsarnamespaces.yaml b/charts/pulsar-resources-operator/crds/resource.streamnative.io_pulsarnamespaces.yaml index b9dbd19b..9cb7eede 100644 --- a/charts/pulsar-resources-operator/crds/resource.streamnative.io_pulsarnamespaces.yaml +++ b/charts/pulsar-resources-operator/crds/resource.streamnative.io_pulsarnamespaces.yaml @@ -99,6 +99,10 @@ spec: description: |- BacklogQuotaRetentionPolicy specifies the retention policy for messages when backlog quota is exceeded. Valid values are "producer_request_hold", "producer_exception", or "consumer_backlog_eviction". + enum: + - producer_request_hold + - producer_exception + - consumer_backlog_eviction type: string backlogQuotaType: description: |- diff --git a/charts/pulsar-resources-operator/crds/resource.streamnative.io_pulsartopics.yaml b/charts/pulsar-resources-operator/crds/resource.streamnative.io_pulsartopics.yaml index 73a5a0b2..f4751e84 100644 --- a/charts/pulsar-resources-operator/crds/resource.streamnative.io_pulsartopics.yaml +++ b/charts/pulsar-resources-operator/crds/resource.streamnative.io_pulsartopics.yaml @@ -105,6 +105,18 @@ spec: description: |- BacklogQuotaRetentionPolicy specifies the retention policy for messages when backlog quota is exceeded. Valid values are "producer_request_hold", "producer_exception", or "consumer_backlog_eviction". + enum: + - producer_request_hold + - producer_exception + - consumer_backlog_eviction + type: string + backlogQuotaType: + description: |- + BacklogQuotaType controls how the backlog quota is enforced. + "destination_storage" limits backlog by size (in bytes), while "message_age" limits by time. + enum: + - destination_storage + - message_age type: string compactionThreshold: description: |- diff --git a/config/crd/bases/resource.streamnative.io_pulsarnamespaces.yaml b/config/crd/bases/resource.streamnative.io_pulsarnamespaces.yaml index b9dbd19b..9cb7eede 100644 --- a/config/crd/bases/resource.streamnative.io_pulsarnamespaces.yaml +++ b/config/crd/bases/resource.streamnative.io_pulsarnamespaces.yaml @@ -99,6 +99,10 @@ spec: description: |- BacklogQuotaRetentionPolicy specifies the retention policy for messages when backlog quota is exceeded. Valid values are "producer_request_hold", "producer_exception", or "consumer_backlog_eviction". + enum: + - producer_request_hold + - producer_exception + - consumer_backlog_eviction type: string backlogQuotaType: description: |- diff --git a/config/crd/bases/resource.streamnative.io_pulsartopics.yaml b/config/crd/bases/resource.streamnative.io_pulsartopics.yaml index 73a5a0b2..f4751e84 100644 --- a/config/crd/bases/resource.streamnative.io_pulsartopics.yaml +++ b/config/crd/bases/resource.streamnative.io_pulsartopics.yaml @@ -105,6 +105,18 @@ spec: description: |- BacklogQuotaRetentionPolicy specifies the retention policy for messages when backlog quota is exceeded. Valid values are "producer_request_hold", "producer_exception", or "consumer_backlog_eviction". + enum: + - producer_request_hold + - producer_exception + - consumer_backlog_eviction + type: string + backlogQuotaType: + description: |- + BacklogQuotaType controls how the backlog quota is enforced. + "destination_storage" limits backlog by size (in bytes), while "message_age" limits by time. + enum: + - destination_storage + - message_age type: string compactionThreshold: description: |- diff --git a/config/default/manager_auth_proxy_patch.yaml b/config/default/manager_auth_proxy_patch.yaml index 4b87bf2b..caf1b62c 100644 --- a/config/default/manager_auth_proxy_patch.yaml +++ b/config/default/manager_auth_proxy_patch.yaml @@ -37,10 +37,10 @@ spec: resources: limits: cpu: 500m - memory: 128Mi + memory: 512Mi requests: cpu: 5m - memory: 64Mi + memory: 128Mi - name: manager args: - "--health-probe-bind-address=:8081" diff --git a/config/manager/manager.yaml b/config/manager/manager.yaml index a4bbb270..a9fd22c4 100644 --- a/config/manager/manager.yaml +++ b/config/manager/manager.yaml @@ -67,9 +67,9 @@ spec: resources: limits: cpu: 500m - memory: 128Mi + memory: 512Mi requests: cpu: 10m - memory: 64Mi + memory: 128Mi serviceAccountName: controller-manager terminationGracePeriodSeconds: 10 diff --git a/docs/pulsar_namespace.md b/docs/pulsar_namespace.md index 966037d6..02d25ead 100644 --- a/docs/pulsar_namespace.md +++ b/docs/pulsar_namespace.md @@ -20,7 +20,7 @@ The `PulsarNamespace` resource defines a namespace in a Pulsar cluster. It allow | `retentionSize` | Maximum size of backlog retained in the namespace. Should be set in conjunction with RetentionTime for effective retention policy. Use "-1" for infinite retention size. | No | | `backlogQuotaLimitTime` | Time limit for message backlog. Messages older than this limit will be removed or handled according to the retention policy. | No | | `backlogQuotaLimitSize` | Size limit for message backlog. When the limit is reached, older messages will be removed or handled according to the retention policy. | No | -| `backlogQuotaRetentionPolicy` | Retention policy for messages when backlog quota is exceeded. Options: "producer_request_hold", "producer_exception", or "consumer_backlog_eviction". | No | +| `backlogQuotaRetentionPolicy` | Retention policy for messages when backlog quota is exceeded. Options: "producer_request_hold", "producer_exception", or "consumer_backlog_eviction". **Required whenever backlogQuotaLimitTime or backlogQuotaLimitSize is set.** | Conditional | | `backlogQuotaType` | Controls how the backlog quota is enforced. Options: "destination_storage" (limits backlog by size in bytes), "message_age" (limits by time). | No | | `offloadThresholdTime` | Time limit for message offloading. Messages older than this limit will be offloaded to the tiered storage. | No | | `offloadThresholdSize` | Size limit for message offloading. When the limit is reached, older messages will be offloaded to the tiered storage. | No | @@ -450,6 +450,8 @@ spec: name: test-pulsar-connection backlogQuotaLimitSize: 1Gi backlogQuotaLimitTime: 24h + backlogQuotaRetentionPolicy: producer_request_hold + # backlogQuotaType: destination_storage bundles: 16 messageTTL: 1h diff --git a/docs/pulsar_topic.md b/docs/pulsar_topic.md index f465082c..3d644020 100644 --- a/docs/pulsar_topic.md +++ b/docs/pulsar_topic.md @@ -23,7 +23,7 @@ The `PulsarTopic` resource defines a topic in a Pulsar cluster. It allows you to | `retentionSize` | Maximum size of backlog retained in the topic. Should be set in conjunction with retentionTime for effective retention policy. Use "-1" for infinite retention size. | No | | `backlogQuotaLimitTime` | Time limit for message backlog. Messages older than this limit will be removed or handled according to the retention policy. | No | | `backlogQuotaLimitSize` | Size limit for message backlog. When the limit is reached, older messages will be removed or handled according to the retention policy. | No | -| `backlogQuotaRetentionPolicy` | Retention policy for messages when backlog quota is exceeded. Options: "producer_request_hold", "producer_exception", or "consumer_backlog_eviction". | No | +| `backlogQuotaRetentionPolicy` | Retention policy for messages when backlog quota is exceeded. Options: "producer_request_hold", "producer_exception", or "consumer_backlog_eviction". **Required whenever backlogQuotaLimitTime or backlogQuotaLimitSize is set.** | Conditional | | `lifecyclePolicy` | Determines whether to keep or delete the Pulsar topic when the Kubernetes resource is deleted. Options: `CleanUpAfterDeletion`, `KeepAfterDeletion`. Default is `CleanUpAfterDeletion`. | No | | `schemaInfo` | Schema information for the topic. See [schemaInfo](#schemainfo) for more details. | No | | `geoReplicationRefs` | List of references to PulsarGeoReplication resources, used to enable geo-replication at the topic level. | No | diff --git a/pkg/admin/impl.go b/pkg/admin/impl.go index c71a4ea4..88806cf0 100644 --- a/pkg/admin/impl.go +++ b/pkg/admin/impl.go @@ -306,29 +306,14 @@ func (p *PulsarAdminClient) applyTopicPolicies(topicName *utils.TopicName, param retentionPolicy = &policy } - var backlogQuotaPolicy *utils.BacklogQuota - var backlogQuotaType utils.BacklogQuotaType - if (params.BacklogQuotaLimitTime != nil || params.BacklogQuotaLimitSize != nil) && - params.BacklogQuotaRetentionPolicy != nil { - backlogTime := int64(-1) - backlogSize := int64(-1) - if params.BacklogQuotaLimitTime != nil { - t, err := params.BacklogQuotaLimitTime.Parse() - if err != nil { - return err - } - backlogTime = int64(t.Seconds()) - backlogQuotaType = utils.MessageAge - } - if params.BacklogQuotaLimitSize != nil { - backlogSize = params.BacklogQuotaLimitSize.Value() - backlogQuotaType = utils.DestinationStorage - } - backlogQuotaPolicy = &utils.BacklogQuota{ - LimitTime: backlogTime, - LimitSize: backlogSize, - Policy: utils.RetentionPolicy(*params.BacklogQuotaRetentionPolicy), - } + backlogQuotaPolicy, backlogQuotaType, err := buildBacklogQuota( + params.BacklogQuotaLimitTime, + params.BacklogQuotaLimitSize, + params.BacklogQuotaRetentionPolicy, + params.BacklogQuotaType, + ) + if err != nil { + return err } switch { @@ -619,6 +604,60 @@ func isRetentionBacklogOrderingError(err error) bool { return strings.Contains(err.Error(), "Retention Quota must exceed configured backlog quota") } +func buildBacklogQuota(limitTime *rutils.Duration, limitSize *resource.Quantity, retentionPolicyStr *string, + backlogQuotaTypeStr *string) (*utils.BacklogQuota, utils.BacklogQuotaType, error) { + if limitTime == nil && limitSize == nil && retentionPolicyStr == nil && backlogQuotaTypeStr == nil { + return nil, "", nil + } + + if retentionPolicyStr == nil { + return nil, "", fmt.Errorf("backlogQuotaRetentionPolicy is required when configuring backlog quota") + } + if limitTime == nil && limitSize == nil { + return nil, "", fmt.Errorf("backlogQuotaLimitTime or backlogQuotaLimitSize is required when configuring backlog quota") + } + retentionPolicy, err := utils.ParseRetentionPolicy(*retentionPolicyStr) + if err != nil { + return nil, "", err + } + + backlogQuotaType := utils.DestinationStorage + if backlogQuotaTypeStr != nil { + parsedType, err := utils.ParseBacklogQuotaType(*backlogQuotaTypeStr) + if err != nil { + return nil, "", err + } + backlogQuotaType = parsedType + } + + backlogQuota := utils.BacklogQuota{ + LimitTime: -1, + LimitSize: -1, + Policy: retentionPolicy, + } + + switch backlogQuotaType { + case utils.DestinationStorage: + if limitSize == nil { + return nil, "", fmt.Errorf("backlogQuotaLimitSize is required when backlogQuotaType is %s", utils.DestinationStorage) + } + backlogQuota.LimitSize = limitSize.Value() + case utils.MessageAge: + if limitTime == nil { + return nil, "", fmt.Errorf("backlogQuotaLimitTime is required when backlogQuotaType is %s", utils.MessageAge) + } + t, err := limitTime.Parse() + if err != nil { + return nil, "", err + } + backlogQuota.LimitTime = int64(t.Seconds()) + default: + return nil, "", fmt.Errorf("unsupported backlog quota type %s", backlogQuotaType) + } + + return &backlogQuota, backlogQuotaType, nil +} + // GetTopicClusters get the assigned clusters of the topic to the local default cluster func (p *PulsarAdminClient) GetTopicClusters(name string, persistent *bool) ([]string, error) { completeTopicName := MakeCompleteTopicName(name, persistent) @@ -1117,35 +1156,17 @@ func (p *PulsarAdminClient) applyNamespacePolicies(completeNSName string, params } } - if (params.BacklogQuotaLimitTime != nil || params.BacklogQuotaLimitSize != nil) && - params.BacklogQuotaRetentionPolicy != nil { - backlogTime := int64(-1) - backlogSize := int64(-1) - if params.BacklogQuotaLimitTime != nil { - t, err := params.BacklogQuotaLimitTime.Parse() - if err != nil { - return err - } - backlogTime = int64(t.Seconds()) - } - if params.BacklogQuotaLimitSize != nil { - backlogSize = params.BacklogQuotaLimitSize.Value() - } - backlogQuotaPolicy := utils.BacklogQuota{ - LimitTime: backlogTime, - LimitSize: backlogSize, - Policy: utils.RetentionPolicy(*params.BacklogQuotaRetentionPolicy), - } - - var backlogQuotaType utils.BacklogQuotaType - if params.BacklogQuotaType != nil { - backlogQuotaType, err = utils.ParseBacklogQuotaType(*params.BacklogQuotaType) - if err != nil { - return err - } - } - err = p.adminClient.Namespaces().SetBacklogQuota(completeNSName, backlogQuotaPolicy, backlogQuotaType) - if err != nil { + backlogQuotaPolicy, backlogQuotaType, err := buildBacklogQuota( + params.BacklogQuotaLimitTime, + params.BacklogQuotaLimitSize, + params.BacklogQuotaRetentionPolicy, + params.BacklogQuotaType, + ) + if err != nil { + return err + } + if backlogQuotaPolicy != nil { + if err := p.adminClient.Namespaces().SetBacklogQuota(completeNSName, *backlogQuotaPolicy, backlogQuotaType); err != nil { return err } } @@ -1188,6 +1209,39 @@ func (p *PulsarAdminClient) applyNamespacePolicies(completeNSName string, params return err } } + // Handle persistence policies + if params.PersistencePolicies != nil { + var markDeleteRate float64 + if params.PersistencePolicies.ManagedLedgerMaxMarkDeleteRate != nil { + var err error + markDeleteRate, err = strconv.ParseFloat(*params.PersistencePolicies.ManagedLedgerMaxMarkDeleteRate, 64) + if err != nil { + return err + } + } + + persistenceData := utils.PersistencePolicies{ + ManagedLedgerMaxMarkDeleteRate: markDeleteRate, + } + if params.PersistencePolicies.BookkeeperEnsemble != nil { + persistenceData.BookkeeperEnsemble = int(*params.PersistencePolicies.BookkeeperEnsemble) + } + if params.PersistencePolicies.BookkeeperWriteQuorum != nil { + persistenceData.BookkeeperWriteQuorum = int(*params.PersistencePolicies.BookkeeperWriteQuorum) + } + if params.PersistencePolicies.BookkeeperAckQuorum != nil { + persistenceData.BookkeeperAckQuorum = int(*params.PersistencePolicies.BookkeeperAckQuorum) + } + + err = p.adminClient.Namespaces().SetPersistence(completeNSName, persistenceData) + if err != nil { + return err + } + } + // Note: When PersistencePolicies is nil, we don't call SetPersistence. + // The pulsar-client-go library doesn't have DeletePersistence for namespaces, + // and sending empty PersistencePolicies{} with BookkeeperEnsemble=0 causes + // validation errors (Bookkeeper-Ensemble must be > 0 and <= 5). if params.BookieAffinityGroup != nil { err = p.adminClient.Namespaces().SetBookieAffinityGroup(completeNSName, utils.BookieAffinityGroupData{ BookkeeperAffinityGroupPrimary: params.BookieAffinityGroup.BookkeeperAffinityGroupPrimary, @@ -1243,6 +1297,30 @@ func (p *PulsarAdminClient) applyNamespacePolicies(completeNSName string, params } } + // Handle inactive topic policies + if params.InactiveTopicPolicies != nil { + inactiveTopicPolicies := utils.InactiveTopicPolicies{} + if params.InactiveTopicPolicies.InactiveTopicDeleteMode != nil { + deleteMode := utils.InactiveTopicDeleteMode(*params.InactiveTopicPolicies.InactiveTopicDeleteMode) + inactiveTopicPolicies.InactiveTopicDeleteMode = &deleteMode + } + if params.InactiveTopicPolicies.MaxInactiveDurationInSeconds != nil { + inactiveTopicPolicies.MaxInactiveDurationSeconds = int(*params.InactiveTopicPolicies.MaxInactiveDurationInSeconds) + } + if params.InactiveTopicPolicies.DeleteWhileInactive != nil { + inactiveTopicPolicies.DeleteWhileInactive = *params.InactiveTopicPolicies.DeleteWhileInactive + } + err = p.adminClient.Namespaces().SetInactiveTopicPolicies(*naName, inactiveTopicPolicies) + if err != nil { + return err + } + } else { + err = p.adminClient.Namespaces().RemoveInactiveTopicPolicies(*naName) + if err != nil && !IsNotFound(err) { + return err + } + } + // Handle dispatch rate limiting if params.DispatchRate != nil { rate := utils.DispatchRate{ @@ -1363,6 +1441,25 @@ func (p *PulsarAdminClient) applyNamespacePolicies(completeNSName string, params } } + // Handle subscription expiration time + if params.SubscriptionExpirationTime != nil { + if params.SubscriptionExpirationTime.IsInfinite() { + // Remove explicit expiration to inherit broker defaults + if err := p.adminClient.Namespaces().RemoveSubscriptionExpirationTime(*naName); err != nil { + return err + } + } else { + duration, err := params.SubscriptionExpirationTime.Parse() + if err != nil { + return err + } + expirationMinutes := int(duration.Minutes()) + if err := p.adminClient.Namespaces().SetSubscriptionExpirationTime(*naName, expirationMinutes); err != nil { + return err + } + } + } + // Handle schema auto-update policy if params.IsAllowAutoUpdateSchema != nil { err = p.adminClient.Namespaces().SetIsAllowAutoUpdateSchema(*naName, *params.IsAllowAutoUpdateSchema) @@ -1379,6 +1476,17 @@ func (p *PulsarAdminClient) applyNamespacePolicies(completeNSName string, params } } + // Handle namespace properties + if len(params.Properties) > 0 { + if err := p.adminClient.Namespaces().UpdateProperties(*naName, params.Properties); err != nil { + return err + } + } else if params.Properties != nil { + if err := p.adminClient.Namespaces().RemoveProperties(*naName); err != nil { + return err + } + } + // Handle encryption requirement if params.EncryptionRequired != nil { err = p.adminClient.Namespaces().SetEncryptionRequiredStatus(*naName, *params.EncryptionRequired) diff --git a/pkg/admin/interface.go b/pkg/admin/interface.go index 81d6d8f9..2eb1e72b 100644 --- a/pkg/admin/interface.go +++ b/pkg/admin/interface.go @@ -90,6 +90,7 @@ type TopicParams struct { BacklogQuotaLimitTime *utils.Duration BacklogQuotaLimitSize *resource.Quantity BacklogQuotaRetentionPolicy *string + BacklogQuotaType *string ReplicationClusters []string Deduplication *bool CompactionThreshold *int64 diff --git a/pkg/connection/reconcile_topic.go b/pkg/connection/reconcile_topic.go index cd4878e6..d27522b9 100644 --- a/pkg/connection/reconcile_topic.go +++ b/pkg/connection/reconcile_topic.go @@ -327,6 +327,7 @@ func createTopicParams(topic *resourcev1alpha1.PulsarTopic) *admin.TopicParams { BacklogQuotaLimitTime: topic.Spec.BacklogQuotaLimitTime, BacklogQuotaLimitSize: topic.Spec.BacklogQuotaLimitSize, BacklogQuotaRetentionPolicy: topic.Spec.BacklogQuotaRetentionPolicy, + BacklogQuotaType: topic.Spec.BacklogQuotaType, Deduplication: topic.Spec.Deduplication, CompactionThreshold: topic.Spec.CompactionThreshold, PersistencePolicies: topic.Spec.PersistencePolicies, @@ -374,6 +375,7 @@ func summarizeTopicParamsForLogging(params *admin.TopicParams) map[string]interf addIfNotNil(summary, "backlogQuotaLimitTime", params.BacklogQuotaLimitTime) addIfNotNil(summary, "backlogQuotaLimitSize", params.BacklogQuotaLimitSize) addIfNotNil(summary, "backlogQuotaRetentionPolicy", params.BacklogQuotaRetentionPolicy) + addIfNotNil(summary, "backlogQuotaType", params.BacklogQuotaType) addIfNotNil(summary, "deduplication", params.Deduplication) addIfNotNil(summary, "compactionThreshold", params.CompactionThreshold) addIfNotNil(summary, "persistencePolicies", params.PersistencePolicies) diff --git a/tests/operator/resources_test.go b/tests/operator/resources_test.go index d19783e6..e9e4f320 100644 --- a/tests/operator/resources_test.go +++ b/tests/operator/resources_test.go @@ -28,6 +28,7 @@ import ( v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + metautil "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -1710,6 +1711,60 @@ var _ = Describe("Resources", func() { Expect(string(*ns.Spec.SubscriptionExpirationTime)).Should(Equal("7d")) }) + It("should reflect subscription expiration time in broker", func() { + podName := fmt.Sprintf("%s-broker-0", brokerName) + containerName := fmt.Sprintf("%s-broker", brokerName) + Eventually(func(g Gomega) { + stdout, _, err := utils.ExecInPod(k8sConfig, namespaceName, podName, containerName, + "./bin/pulsar-admin namespaces get-subscription-expiration-time "+storagePoliciesPulsarNSName) + g.Expect(err).Should(Succeed()) + // 7d -> 10080 minutes + g.Expect(stdout).Should(ContainSubstring("10080")) + }, "30s", "200ms").Should(Succeed()) + }) + + It("should update subscription expiration time to infinite", func() { + ns := &v1alphav1.PulsarNamespace{} + tns := types.NamespacedName{Namespace: namespaceName, Name: storagePoliciesNamespaceName} + Expect(k8sClient.Get(ctx, tns, ns)).Should(Succeed()) + + var infiniteExp rutils.Duration = "-1" + ns.Spec.SubscriptionExpirationTime = &infiniteExp + + err := k8sClient.Update(ctx, ns) + Expect(err).Should(Succeed()) + }) + + It("should be ready after subscription expiration update", func() { + Eventually(func() bool { + ns := &v1alphav1.PulsarNamespace{} + tns := types.NamespacedName{Namespace: namespaceName, Name: storagePoliciesNamespaceName} + Expect(k8sClient.Get(ctx, tns, ns)).Should(Succeed()) + return v1alphav1.IsPulsarResourceReady(ns) + }, "30s", "100ms").Should(BeTrue()) + }) + + It("should reflect updated subscription expiration time", func() { + ns := &v1alphav1.PulsarNamespace{} + tns := types.NamespacedName{Namespace: namespaceName, Name: storagePoliciesNamespaceName} + Expect(k8sClient.Get(ctx, tns, ns)).Should(Succeed()) + + Expect(ns.Spec.SubscriptionExpirationTime).ShouldNot(BeNil()) + Expect(string(*ns.Spec.SubscriptionExpirationTime)).Should(Equal("-1")) + }) + + It("should reflect updated subscription expiration time in broker", func() { + podName := fmt.Sprintf("%s-broker-0", brokerName) + containerName := fmt.Sprintf("%s-broker", brokerName) + Eventually(func(g Gomega) { + stdout, _, err := utils.ExecInPod(k8sConfig, namespaceName, podName, containerName, + "./bin/pulsar-admin namespaces get-subscription-expiration-time "+storagePoliciesPulsarNSName) + g.Expect(err).Should(Succeed()) + // Pulsar returns null once the expiration time is removed (infinite) + g.Expect(stdout).Should(ContainSubstring("null")) + }, "30s", "200ms").Should(Succeed()) + }) + It("should have correct custom properties", func() { ns := &v1alphav1.PulsarNamespace{} tns := types.NamespacedName{Namespace: namespaceName, Name: storagePoliciesNamespaceName} @@ -1721,13 +1776,70 @@ var _ = Describe("Resources", func() { Expect(ns.Spec.Properties["team"]).Should(Equal("qa")) }) + It("should surface an error condition when backlog quota retention policy is unset", func() { + ns := &v1alphav1.PulsarNamespace{} + tns := types.NamespacedName{Namespace: namespaceName, Name: storagePoliciesNamespaceName} + Expect(k8sClient.Get(ctx, tns, ns)).Should(Succeed()) + + ns.Spec.BacklogQuotaRetentionPolicy = nil + + Expect(k8sClient.Update(ctx, ns)).Should(Succeed()) + + Eventually(func(g Gomega) metav1.ConditionStatus { + current := &v1alphav1.PulsarNamespace{} + g.Expect(k8sClient.Get(ctx, tns, current)).Should(Succeed()) + + cond := metautil.FindStatusCondition(current.Status.Conditions, string(v1alphav1.ConditionReady)) + if cond == nil { + return metav1.ConditionUnknown + } + g.Expect(cond.Message).Should(ContainSubstring("backlogQuotaRetentionPolicy is required")) + return cond.Status + }, "30s", "500ms").Should(Equal(metav1.ConditionFalse)) + + // Re-fetch to get latest ResourceVersion before updating + Expect(k8sClient.Get(ctx, tns, ns)).Should(Succeed()) + ns.Spec.BacklogQuotaRetentionPolicy = pointer.String("producer_request_hold") + Expect(k8sClient.Update(ctx, ns)).Should(Succeed()) + }) + + It("should allow clearing inactive topic policies and properties", func() { + ns := &v1alphav1.PulsarNamespace{} + tns := types.NamespacedName{Namespace: namespaceName, Name: storagePoliciesNamespaceName} + Expect(k8sClient.Get(ctx, tns, ns)).Should(Succeed()) + + ns.Spec.InactiveTopicPolicies = nil + ns.Spec.Properties = nil + + err := k8sClient.Update(ctx, ns) + Expect(err).Should(Succeed()) + }) + + It("should be ready after clearing inactive topic policies and properties", func() { + Eventually(func() bool { + ns := &v1alphav1.PulsarNamespace{} + tns := types.NamespacedName{Namespace: namespaceName, Name: storagePoliciesNamespaceName} + Expect(k8sClient.Get(ctx, tns, ns)).Should(Succeed()) + return v1alphav1.IsPulsarResourceReady(ns) + }, "30s", "100ms").Should(BeTrue()) + }) + + It("should reflect cleared inactive topic policies and properties", func() { + ns := &v1alphav1.PulsarNamespace{} + tns := types.NamespacedName{Namespace: namespaceName, Name: storagePoliciesNamespaceName} + Expect(k8sClient.Get(ctx, tns, ns)).Should(Succeed()) + + Expect(ns.Spec.InactiveTopicPolicies).Should(BeNil()) + Expect(ns.Spec.Properties).Should(BeNil()) + }) + It("should update persistence policies successfully", func() { ns := &v1alphav1.PulsarNamespace{} tns := types.NamespacedName{Namespace: namespaceName, Name: storagePoliciesNamespaceName} Expect(k8sClient.Get(ctx, tns, ns)).Should(Succeed()) // Update PersistencePolicies - ns.Spec.PersistencePolicies.BookkeeperEnsemble = pointer.Int32(7) + ns.Spec.PersistencePolicies.BookkeeperEnsemble = pointer.Int32(5) ns.Spec.PersistencePolicies.BookkeeperWriteQuorum = pointer.Int32(4) ns.Spec.PersistencePolicies.BookkeeperAckQuorum = pointer.Int32(3) @@ -1751,11 +1863,54 @@ var _ = Describe("Resources", func() { // Verify updated PersistencePolicies Expect(ns.Spec.PersistencePolicies).ShouldNot(BeNil()) - Expect(*ns.Spec.PersistencePolicies.BookkeeperEnsemble).Should(Equal(int32(7))) + Expect(*ns.Spec.PersistencePolicies.BookkeeperEnsemble).Should(Equal(int32(5))) Expect(*ns.Spec.PersistencePolicies.BookkeeperWriteQuorum).Should(Equal(int32(4))) Expect(*ns.Spec.PersistencePolicies.BookkeeperAckQuorum).Should(Equal(int32(3))) }) + It("should clear persistence policies successfully", func() { + ns := &v1alphav1.PulsarNamespace{} + tns := types.NamespacedName{Namespace: namespaceName, Name: storagePoliciesNamespaceName} + Expect(k8sClient.Get(ctx, tns, ns)).Should(Succeed()) + + // Clear PersistencePolicies + ns.Spec.PersistencePolicies = nil + + err := k8sClient.Update(ctx, ns) + Expect(err).Should(Succeed()) + }) + + It("should be ready after clearing persistence policies", func() { + Eventually(func() bool { + ns := &v1alphav1.PulsarNamespace{} + tns := types.NamespacedName{Namespace: namespaceName, Name: storagePoliciesNamespaceName} + Expect(k8sClient.Get(ctx, tns, ns)).Should(Succeed()) + return v1alphav1.IsPulsarResourceReady(ns) + }, "30s", "100ms").Should(BeTrue()) + }) + + It("should have cleared persistence policies", func() { + ns := &v1alphav1.PulsarNamespace{} + tns := types.NamespacedName{Namespace: namespaceName, Name: storagePoliciesNamespaceName} + Expect(k8sClient.Get(ctx, tns, ns)).Should(Succeed()) + + // Verify PersistencePolicies is cleared + Expect(ns.Spec.PersistencePolicies).Should(BeNil()) + }) + + // TODO: https://github.com/apache/pulsar-client-go/pull/1447 add RemovePersistence methods + // It("should reflect cleared persistence policies in broker", func() { + // podName := fmt.Sprintf("%s-broker-0", brokerName) + // containerName := fmt.Sprintf("%s-broker", brokerName) + // Eventually(func(g Gomega) { + // stdout, _, err := utils.ExecInPod(k8sConfig, namespaceName, podName, containerName, + // "./bin/pulsar-admin namespaces get-persistence "+storagePoliciesPulsarNSName) + // g.Expect(err).Should(Succeed()) + // // When persistence policies are reset, broker returns default values (0) + // g.Expect(stdout).Should(ContainSubstring("\"bookkeeperEnsemble\" : 0")) + // }, "30s", "200ms").Should(Succeed()) + // }) + It("should update compaction threshold successfully", func() { ns := &v1alphav1.PulsarNamespace{} tns := types.NamespacedName{Namespace: namespaceName, Name: storagePoliciesNamespaceName} diff --git a/tests/utils/spec.go b/tests/utils/spec.go index 5f60157f..79d2f52b 100644 --- a/tests/utils/spec.go +++ b/tests/utils/spec.go @@ -67,6 +67,8 @@ func MakePulsarNamespace(namespace, name, namespaceName, connectionName string, var du rsutils.Duration = "1d" limitTime := &du ttl := &du + backlogPolicy := ptr.To("producer_request_hold") + backlogType := ptr.To("destination_storage") return &v1alpha1.PulsarNamespace{ ObjectMeta: metav1.ObjectMeta{ Namespace: namespace, @@ -77,10 +79,12 @@ func MakePulsarNamespace(namespace, name, namespaceName, connectionName string, ConnectionRef: corev1.LocalObjectReference{ Name: connectionName, }, - BacklogQuotaLimitTime: limitTime, - BacklogQuotaLimitSize: &backlogSize, - Bundles: &bundle, - MessageTTL: ttl, + BacklogQuotaLimitTime: limitTime, + BacklogQuotaLimitSize: &backlogSize, + BacklogQuotaRetentionPolicy: backlogPolicy, + BacklogQuotaType: backlogType, + Bundles: &bundle, + MessageTTL: ttl, TopicAutoCreationConfig: &v1alpha1.TopicAutoCreationConfig{ Allow: true, Type: "partitioned", @@ -97,6 +101,8 @@ func MakePulsarNamespaceWithRateLimiting(namespace, name, namespaceName, connect var du rsutils.Duration = "12h" limitTime := &du ttl := &du + backlogPolicy := ptr.To("producer_request_hold") + backlogType := ptr.To("destination_storage") return &v1alpha1.PulsarNamespace{ ObjectMeta: metav1.ObjectMeta{ @@ -108,11 +114,13 @@ func MakePulsarNamespaceWithRateLimiting(namespace, name, namespaceName, connect ConnectionRef: corev1.LocalObjectReference{ Name: connectionName, }, - BacklogQuotaLimitTime: limitTime, - BacklogQuotaLimitSize: &backlogSize, - Bundles: &bundle, - MessageTTL: ttl, - LifecyclePolicy: policy, + BacklogQuotaLimitTime: limitTime, + BacklogQuotaLimitSize: &backlogSize, + BacklogQuotaRetentionPolicy: backlogPolicy, + BacklogQuotaType: backlogType, + Bundles: &bundle, + MessageTTL: ttl, + LifecyclePolicy: policy, // Rate limiting configurations DispatchRate: &v1alpha1.DispatchRate{