Skip to content

Commit 7dacb77

Browse files
Skip Kafka function cleanup jobs
1 parent 22713e7 commit 7dacb77

5 files changed

Lines changed: 49 additions & 5 deletions

File tree

controllers/function.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -354,8 +354,16 @@ func (r *FunctionReconciler) ApplyFunctionVPA(ctx context.Context, function *v1a
354354
}
355355

356356
func (r *FunctionReconciler) ApplyFunctionCleanUpJob(ctx context.Context, function *v1alpha1.Function) error {
357+
desiredJob := spec.MakeFunctionCleanUpJob(function)
358+
hasCleanupFinalizer := containsCleanupFinalizer(function.ObjectMeta.Finalizers)
359+
if desiredJob == nil {
360+
if hasCleanupFinalizer {
361+
function.ObjectMeta.Finalizers = removeCleanupFinalizer(function.ObjectMeta.Finalizers)
362+
return r.Update(ctx, function)
363+
}
364+
return nil
365+
}
357366
if !spec.NeedCleanup(function) {
358-
desiredJob := spec.MakeFunctionCleanUpJob(function)
359367
if err := r.Delete(ctx, desiredJob, getBackgroundDeletionPolicy()); err != nil {
360368
if errors.IsNotFound(err) {
361369
return nil
@@ -367,12 +375,10 @@ func (r *FunctionReconciler) ApplyFunctionCleanUpJob(ctx context.Context, functi
367375
}
368376
return nil
369377
}
370-
hasCleanupFinalizer := containsCleanupFinalizer(function.ObjectMeta.Finalizers)
371378
if function.Spec.CleanupSubscription {
372379
// add finalizer if function is updated to clean up subscription
373380
if function.ObjectMeta.DeletionTimestamp.IsZero() {
374381
if !hasCleanupFinalizer {
375-
desiredJob := spec.MakeFunctionCleanUpJob(function)
376382
if _, err := ctrl.CreateOrUpdate(ctx, r.Client, desiredJob, func() error {
377383
return nil
378384
}); err != nil {
@@ -387,7 +393,6 @@ func (r *FunctionReconciler) ApplyFunctionCleanUpJob(ctx context.Context, functi
387393
}
388394
}
389395
} else {
390-
desiredJob := spec.MakeFunctionCleanUpJob(function)
391396
// if function is deleting, send an "INT" signal to the cleanup job to clean up subscription
392397
if hasCleanupFinalizer {
393398
if err := spec.TriggerCleanup(ctx, r.Client, r.RestClient, r.Config, desiredJob); err != nil {
@@ -413,7 +418,6 @@ func (r *FunctionReconciler) ApplyFunctionCleanUpJob(ctx context.Context, functi
413418
return err
414419
}
415420

416-
desiredJob := spec.MakeFunctionCleanUpJob(function)
417421
// delete the cleanup job
418422
if err := r.Delete(ctx, desiredJob, getBackgroundDeletionPolicy()); err != nil {
419423
return err

controllers/spec/function.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,9 @@ func MakeFunctionObjectMeta(function *v1alpha1.Function) *metav1.ObjectMeta {
101101
}
102102

103103
func MakeFunctionCleanUpJob(function *v1alpha1.Function) *v1.Job {
104+
if isKafkaFunction(function) || function.Spec.Pulsar == nil {
105+
return nil
106+
}
104107
labels := makeFunctionLabels(function)
105108
labels["owner"] = string(function.GetUID())
106109
objectMeta := &metav1.ObjectMeta{

controllers/spec/function_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -564,6 +564,25 @@ func TestFunctionCommandDoesNotBuildPulsarRuntimeCommandForKafkaJavaFunction(t *
564564
assert.Assert(t, command == nil, "kafka messaging with java runtime should not build a pulsar command, got %v", command)
565565
}
566566

567+
func TestMakeFunctionCleanUpJobSkipsKafkaFunction(t *testing.T) {
568+
function := makeFunctionSample("generic-kafka-cleanup")
569+
function.Spec.Messaging = v1alpha1.Messaging{
570+
Kafka: &v1alpha1.KafkaMessaging{
571+
BootstrapServers: "kafka:9092",
572+
},
573+
}
574+
function.Spec.Runtime = v1alpha1.Runtime{
575+
GenericRuntime: &v1alpha1.GenericRuntime{
576+
FunctionFile: "/pulsar/function.py",
577+
Language: "python",
578+
},
579+
}
580+
function.Spec.CleanupSubscription = true
581+
582+
cleanupJob := MakeFunctionCleanUpJob(function)
583+
assert.Assert(t, cleanupJob == nil, "kafka function should not create pulsar cleanup job")
584+
}
585+
567586
func TestFunctionPulsarPackageServiceDownloadFallbackToMessaging(t *testing.T) {
568587
previous := utils.EnableInitContainers
569588
defer func() {

pkg/webhook/validate.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,10 @@ func validateFunctionMessaging(spec *v1alpha1.FunctionSpec) *field.Error {
438438
spec.Kafka.AuthConfig.PlainAuthConfig.SecretName,
439439
"kafka.authConfig.plainAuthConfig.secretName needs to be set")
440440
}
441+
if spec.CleanupSubscription {
442+
return field.Invalid(field.NewPath("spec").Child("cleanupSubscription"), spec.CleanupSubscription,
443+
"cleanupSubscription only supports pulsar messaging")
444+
}
441445
return nil
442446
}
443447
return validateMessaging(&spec.Messaging)

pkg/webhook/validate_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,20 @@ func TestValidateFunctionMessagingRejectsMissingKafkaPlainAuthSecretName(t *test
6262
}
6363
}
6464

65+
func TestValidateFunctionMessagingRejectsKafkaCleanupSubscription(t *testing.T) {
66+
err := validateFunctionMessaging(&v1alpha1.FunctionSpec{
67+
CleanupSubscription: true,
68+
Messaging: v1alpha1.Messaging{
69+
Kafka: &v1alpha1.KafkaMessaging{
70+
BootstrapServers: "kafka:9092",
71+
},
72+
},
73+
})
74+
if err == nil || !strings.Contains(err.Error(), "cleanupSubscription only supports pulsar messaging") {
75+
t.Fatalf("expected kafka cleanupSubscription error, got %v", err)
76+
}
77+
}
78+
6579
func TestValidateKafkaMessagingUnsupportedRejectsSourceAndSink(t *testing.T) {
6680
messaging := &v1alpha1.Messaging{
6781
Pulsar: &v1alpha1.PulsarMessaging{PulsarConfig: "pulsar-config"},

0 commit comments

Comments
 (0)