Skip to content

Commit f685189

Browse files
Address Kafka review comments
1 parent 23d062d commit f685189

4 files changed

Lines changed: 29 additions & 3 deletions

File tree

controllers/spec/utils.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -250,8 +250,7 @@ func makeKafkaInputSpecs(function *v1alpha1.Function) map[string]interface{} {
250250
key := kafkaInputSpecKey(function, topic)
251251
spec, ok := inputSpecs[key].(map[string]interface{})
252252
if !ok {
253-
spec = map[string]interface{}{}
254-
inputSpecs[key] = spec
253+
continue
255254
}
256255
spec["kafka_schema"] = makeKafkaSchema(function.Spec.Input.SourceSpecs[topic].SchemaType, &schemaConfig)
257256
}

controllers/spec/utils_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,9 @@ func TestConvertFunctionDetailsWithKafkaConfig(t *testing.T) {
181181
"orders": {
182182
Type: stringPtr("json"),
183183
},
184+
"unknown-topic": {
185+
Type: stringPtr("json"),
186+
},
184187
},
185188
OutputSchemaConfig: &v1alpha1.KafkaSchemaConfig{
186189
Type: stringPtr("json"),
@@ -197,7 +200,9 @@ func TestConvertFunctionDetailsWithKafkaConfig(t *testing.T) {
197200
assert.Equal(t, "earliest", kafkaConfig["consumer_config"].(map[string]interface{})["auto.offset.reset"])
198201
assert.Equal(t, "kafka:9092", kafkaConfig["producer_config"].(map[string]interface{})["bootstrap.servers"])
199202
assert.Equal(t, float64(5), kafkaConfig["producer_config"].(map[string]interface{})["linger.ms"])
200-
assert.Equal(t, "json", kafkaConfig["input_specs"].(map[string]interface{})["orders"].(map[string]interface{})["kafka_schema"].(map[string]interface{})["type"])
203+
inputSpecs := kafkaConfig["input_specs"].(map[string]interface{})
204+
assert.Equal(t, "json", inputSpecs["orders"].(map[string]interface{})["kafka_schema"].(map[string]interface{})["type"])
205+
assert.NotContains(t, inputSpecs, "unknown-topic")
201206
assert.Equal(t, "json", kafkaConfig["output_specs"].(map[string]interface{})["enriched-orders"].(map[string]interface{})["kafka_schema"].(map[string]interface{})["type"])
202207
}
203208

pkg/webhook/validate.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,12 @@ func validateFunctionMessaging(spec *v1alpha1.FunctionSpec) *field.Error {
461461
return field.Invalid(field.NewPath("spec").Child("kafka", "bootstrapServers"),
462462
spec.Kafka.BootstrapServers, "kafka.bootstrapServers needs to be set")
463463
}
464+
if spec.Kafka.AuthConfig != nil && spec.Kafka.AuthConfig.PlainAuthConfig != nil &&
465+
spec.Kafka.AuthConfig.PlainAuthConfig.SecretName == "" {
466+
return field.Invalid(field.NewPath("spec").Child("kafka", "authConfig", "plainAuthConfig", "secretName"),
467+
spec.Kafka.AuthConfig.PlainAuthConfig.SecretName,
468+
"kafka.authConfig.plainAuthConfig.secretName needs to be set")
469+
}
464470
return nil
465471
}
466472
return validateMessaging(&spec.Messaging)

pkg/webhook/validate_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,22 @@ func TestValidateFunctionMessagingRejectsMissingKafkaBootstrapServers(t *testing
4646
}
4747
}
4848

49+
func TestValidateFunctionMessagingRejectsMissingKafkaPlainAuthSecretName(t *testing.T) {
50+
err := validateFunctionMessaging(&v1alpha1.FunctionSpec{
51+
Messaging: v1alpha1.Messaging{
52+
Kafka: &v1alpha1.KafkaMessaging{
53+
BootstrapServers: "kafka:9092",
54+
AuthConfig: &v1alpha1.KafkaAuthConfig{
55+
PlainAuthConfig: &v1alpha1.KafkaPlainAuthConfig{},
56+
},
57+
},
58+
},
59+
})
60+
if err == nil || !strings.Contains(err.Error(), "kafka.authConfig.plainAuthConfig.secretName needs to be set") {
61+
t.Fatalf("expected missing kafka plain auth secretName error, got %v", err)
62+
}
63+
}
64+
4965
func TestValidateKafkaMessagingUnsupportedRejectsSourceAndSink(t *testing.T) {
5066
messaging := &v1alpha1.Messaging{
5167
Pulsar: &v1alpha1.PulsarMessaging{PulsarConfig: "pulsar-config"},

0 commit comments

Comments
 (0)