Skip to content

Commit 5d2d67c

Browse files
authored
fix schema remove on existing resources (#318)
* fix 308 and 309 * fix * remove schema test case
1 parent 8cf8713 commit 5d2d67c

2 files changed

Lines changed: 13 additions & 15 deletions

File tree

pkg/connection/reconcile_topic.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -265,13 +265,11 @@ func applySchema(pulsarAdmin admin.PulsarAdmin, topic *resourcev1alpha1.PulsarTo
265265
return err
266266
}
267267
}
268-
} else if schema != nil {
269-
// Delete the schema when the schema exists and schema info is empty
270-
log.Info("Deleting topic schema", "name", topic.Spec.Name)
271-
if err := pulsarAdmin.DeleteSchema(topic.Spec.Name); err != nil {
272-
return err
273-
}
274268
}
269+
// Note: We intentionally do NOT delete existing schemas when schemaInfo is not specified.
270+
// This preserves existing schemas that may have been created by producers/consumers,
271+
// which is the expected behavior for most users. If schema deletion is needed,
272+
// users should explicitly manage it through the Pulsar admin APIs.
275273
return nil
276274
}
277275

tests/operator/resources_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -260,15 +260,15 @@ var _ = Describe("Resources", func() {
260260
topic.Spec.SchemaInfo = nil
261261
Expect(k8sClient.Update(ctx, topic)).Should(Succeed())
262262

263-
By("check topic1 schema is deleted in pulsar")
264-
Eventually(func(g Gomega) {
265-
_, stderr, err := utils.ExecInPod(k8sConfig, namespaceName, podName, containerName,
266-
"./bin/pulsarctl -s http://localhost:8080 --token=$PROXY_TOKEN schemas get "+ptopic.Spec.Name)
267-
g.Expect(err).ShouldNot(BeNil())
268-
g.Expect(stderr).Should(Not(BeEmpty()))
269-
format.MaxLength = 0
270-
g.Expect(stderr).Should(ContainSubstring("404"))
271-
}, "5s", "100ms").Should(Succeed())
263+
// By("check topic1 schema is deleted in pulsar")
264+
// Eventually(func(g Gomega) {
265+
// _, stderr, err := utils.ExecInPod(k8sConfig, namespaceName, podName, containerName,
266+
// "./bin/pulsarctl -s http://localhost:8080 --token=$PROXY_TOKEN schemas get "+ptopic.Spec.Name)
267+
// g.Expect(err).ShouldNot(BeNil())
268+
// g.Expect(stderr).Should(Not(BeEmpty()))
269+
// format.MaxLength = 0
270+
// g.Expect(stderr).Should(ContainSubstring("404"))
271+
// }, "5s", "100ms").Should(Succeed())
272272
})
273273

274274
It("should increase the partitions successfully", func() {

0 commit comments

Comments
 (0)