diff --git a/pkg/connection/reconcile_topic.go b/pkg/connection/reconcile_topic.go index 723dedd0..a9535e31 100644 --- a/pkg/connection/reconcile_topic.go +++ b/pkg/connection/reconcile_topic.go @@ -265,13 +265,11 @@ func applySchema(pulsarAdmin admin.PulsarAdmin, topic *resourcev1alpha1.PulsarTo return err } } - } else if schema != nil { - // Delete the schema when the schema exists and schema info is empty - log.Info("Deleting topic schema", "name", topic.Spec.Name) - if err := pulsarAdmin.DeleteSchema(topic.Spec.Name); err != nil { - return err - } } + // Note: We intentionally do NOT delete existing schemas when schemaInfo is not specified. + // This preserves existing schemas that may have been created by producers/consumers, + // which is the expected behavior for most users. If schema deletion is needed, + // users should explicitly manage it through the Pulsar admin APIs. return nil } diff --git a/tests/operator/resources_test.go b/tests/operator/resources_test.go index cd8b75fd..71708b42 100644 --- a/tests/operator/resources_test.go +++ b/tests/operator/resources_test.go @@ -260,15 +260,15 @@ var _ = Describe("Resources", func() { topic.Spec.SchemaInfo = nil Expect(k8sClient.Update(ctx, topic)).Should(Succeed()) - By("check topic1 schema is deleted in pulsar") - Eventually(func(g Gomega) { - _, stderr, err := utils.ExecInPod(k8sConfig, namespaceName, podName, containerName, - "./bin/pulsarctl -s http://localhost:8080 --token=$PROXY_TOKEN schemas get "+ptopic.Spec.Name) - g.Expect(err).ShouldNot(BeNil()) - g.Expect(stderr).Should(Not(BeEmpty())) - format.MaxLength = 0 - g.Expect(stderr).Should(ContainSubstring("404")) - }, "5s", "100ms").Should(Succeed()) + // By("check topic1 schema is deleted in pulsar") + // Eventually(func(g Gomega) { + // _, stderr, err := utils.ExecInPod(k8sConfig, namespaceName, podName, containerName, + // "./bin/pulsarctl -s http://localhost:8080 --token=$PROXY_TOKEN schemas get "+ptopic.Spec.Name) + // g.Expect(err).ShouldNot(BeNil()) + // g.Expect(stderr).Should(Not(BeEmpty())) + // format.MaxLength = 0 + // g.Expect(stderr).Should(ContainSubstring("404")) + // }, "5s", "100ms").Should(Succeed()) }) It("should increase the partitions successfully", func() {