Skip to content

Commit 22713e7

Browse files
Address Kafka validation review comments
1 parent f685189 commit 22713e7

6 files changed

Lines changed: 113 additions & 57 deletions

File tree

controllers/spec/function.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,9 @@ func makeFunctionLabels(function *v1alpha1.Function) map[string]string {
237237

238238
func makeFunctionCommand(function *v1alpha1.Function) []string {
239239
spec := function.Spec
240+
if isKafkaFunction(function) && (spec.GenericRuntime == nil || spec.GenericRuntime.FunctionFile == "") {
241+
return nil
242+
}
240243
downloadConfig := NewDownloadServiceConfig(spec.PulsarPackageService, spec.Pulsar)
241244
pulsarAuthConfig := functionPulsarAuthConfig(function)
242245
pulsarTLSConfig := functionPulsarTLSConfig(function)

controllers/spec/function_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -552,6 +552,18 @@ func TestGenericFunctionCommandUsesKafkaMessaging(t *testing.T) {
552552
assert.Assert(t, hasKafkaOAuthMount, "container should include kafka oauth2 mount")
553553
}
554554

555+
func TestFunctionCommandDoesNotBuildPulsarRuntimeCommandForKafkaJavaFunction(t *testing.T) {
556+
function := makeFunctionSample("java-kafka-command")
557+
function.Spec.Messaging = v1alpha1.Messaging{
558+
Kafka: &v1alpha1.KafkaMessaging{
559+
BootstrapServers: "kafka:9092",
560+
},
561+
}
562+
563+
command := makeFunctionCommand(function)
564+
assert.Assert(t, command == nil, "kafka messaging with java runtime should not build a pulsar command, got %v", command)
565+
}
566+
555567
func TestFunctionPulsarPackageServiceDownloadFallbackToMessaging(t *testing.T) {
556568
previous := utils.EnableInitContainers
557569
defer func() {

controllers/spec/utils.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -189,15 +189,16 @@ func makeKafkaConfig(function *v1alpha1.Function) map[string]interface{} {
189189
if kafka == nil {
190190
return nil
191191
}
192+
securityProtocol := kafkaSecurityProtocol(kafka)
192193
consumerConfig := map[string]interface{}{
193194
"bootstrap.servers": kafka.BootstrapServers,
194-
"security.protocol": kafkaSecurityProtocol(kafka),
195+
"security.protocol": securityProtocol,
195196
}
196197
addConfigData(consumerConfig, kafka.ConsumerConfig)
197198

198199
producerConfig := map[string]interface{}{
199200
"bootstrap.servers": kafka.BootstrapServers,
200-
"security.protocol": kafkaSecurityProtocol(kafka),
201+
"security.protocol": securityProtocol,
201202
}
202203
addConfigData(producerConfig, kafka.ProducerConfig)
203204

@@ -252,7 +253,7 @@ func makeKafkaInputSpecs(function *v1alpha1.Function) map[string]interface{} {
252253
if !ok {
253254
continue
254255
}
255-
spec["kafka_schema"] = makeKafkaSchema(function.Spec.Input.SourceSpecs[topic].SchemaType, &schemaConfig)
256+
spec["kafka_schema"] = makeKafkaSchema(kafkaSchemaType(spec["kafka_schema"]), &schemaConfig)
256257
}
257258
return inputSpecs
258259
}
@@ -307,6 +308,15 @@ func makeKafkaSchema(defaultType string, schemaConfig *v1alpha1.KafkaSchemaConfi
307308
return schema
308309
}
309310

311+
func kafkaSchemaType(schema interface{}) string {
312+
if schemaMap, ok := schema.(map[string]interface{}); ok {
313+
if schemaType, ok := schemaMap["type"].(string); ok {
314+
return schemaType
315+
}
316+
}
317+
return ""
318+
}
319+
310320
func kafkaInputSchemaConfig(function *v1alpha1.Function, topic string) *v1alpha1.KafkaSchemaConfig {
311321
if function.Spec.Kafka == nil {
312322
return nil

controllers/spec/utils_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,37 @@ func TestConvertFunctionDetailsWithKafkaConfig(t *testing.T) {
206206
assert.Equal(t, "json", kafkaConfig["output_specs"].(map[string]interface{})["enriched-orders"].(map[string]interface{})["kafka_schema"].(map[string]interface{})["type"])
207207
}
208208

209+
func TestConvertFunctionDetailsKafkaInputSchemaConfigKeepsDeclaredTopicType(t *testing.T) {
210+
function := makeFunctionSample("generic-kafka")
211+
function.Spec.Runtime = v1alpha1.Runtime{
212+
GenericRuntime: &v1alpha1.GenericRuntime{
213+
FunctionFile: "/pulsar/function.py",
214+
Language: "python",
215+
},
216+
}
217+
function.Spec.Input = v1alpha1.InputConf{
218+
TypeClassName: "string",
219+
Topics: []string{"orders"},
220+
}
221+
function.Spec.Kafka = &v1alpha1.KafkaMessaging{
222+
BootstrapServers: "kafka:9092",
223+
InputSchemaConfigs: map[string]v1alpha1.KafkaSchemaConfig{
224+
"orders": {
225+
Subject: stringPtr("orders-value"),
226+
},
227+
},
228+
}
229+
230+
details := convertFunctionDetails(function)
231+
userConfig := map[string]interface{}{}
232+
assert.NoError(t, json.Unmarshal([]byte(details.UserConfig), &userConfig))
233+
kafkaConfig := userConfig["_kafka_config"].(map[string]interface{})
234+
inputSpecs := kafkaConfig["input_specs"].(map[string]interface{})
235+
kafkaSchema := inputSpecs["orders"].(map[string]interface{})["kafka_schema"].(map[string]interface{})
236+
assert.Equal(t, "string", kafkaSchema["type"])
237+
assert.Equal(t, "orders-value", kafkaSchema["subject"])
238+
}
239+
209240
func stringPtr(value string) *string {
210241
return &value
211242
}

pkg/webhook/validate.go

Lines changed: 29 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,16 @@ func validateSecretsMap(secrets map[string]v1alpha1.SecretRef) *field.Error {
232232

233233
func validateInputOutput(input *v1alpha1.InputConf, output *v1alpha1.OutputConf,
234234
skipInputValidation bool, skipOutputValidation bool) []*field.Error {
235+
return validateInputOutputWithTopicNameValidation(input, output, skipInputValidation, skipOutputValidation, true)
236+
}
237+
238+
func validateKafkaInputOutput(input *v1alpha1.InputConf, output *v1alpha1.OutputConf,
239+
skipInputValidation bool, skipOutputValidation bool) []*field.Error {
240+
return validateInputOutputWithTopicNameValidation(input, output, skipInputValidation, skipOutputValidation, false)
241+
}
242+
243+
func validateInputOutputWithTopicNameValidation(input *v1alpha1.InputConf, output *v1alpha1.OutputConf,
244+
skipInputValidation bool, skipOutputValidation bool, validateTopicNames bool) []*field.Error {
235245
var allErrs field.ErrorList
236246
allInputTopics := []string{}
237247
if input != nil {
@@ -243,12 +253,14 @@ func validateInputOutput(input *v1alpha1.InputConf, output *v1alpha1.OutputConf,
243253
allErrs = append(allErrs, e)
244254
}
245255

246-
for _, topic := range allInputTopics {
247-
err := isValidTopicName(topic)
248-
if err != nil {
249-
e := field.Invalid(field.NewPath("spec").Child("input"), *input,
250-
fmt.Sprintf("Input topic %s is invalid", topic))
251-
allErrs = append(allErrs, e)
256+
if validateTopicNames {
257+
for _, topic := range allInputTopics {
258+
err := isValidTopicName(topic)
259+
if err != nil {
260+
e := field.Invalid(field.NewPath("spec").Child("input"), *input,
261+
fmt.Sprintf("Input topic %s is invalid", topic))
262+
allErrs = append(allErrs, e)
263+
}
252264
}
253265
}
254266

@@ -270,11 +282,13 @@ func validateInputOutput(input *v1alpha1.InputConf, output *v1alpha1.OutputConf,
270282

271283
if output != nil && !skipOutputValidation {
272284
if output.Topic != "" {
273-
err := isValidTopicName(output.Topic)
274-
if err != nil {
275-
e := field.Invalid(field.NewPath("spec").Child("output", "topic"), output.Topic,
276-
fmt.Sprintf("Output topic %s is invalid", output.Topic))
277-
allErrs = append(allErrs, e)
285+
if validateTopicNames {
286+
err := isValidTopicName(output.Topic)
287+
if err != nil {
288+
e := field.Invalid(field.NewPath("spec").Child("output", "topic"), output.Topic,
289+
fmt.Sprintf("Output topic %s is invalid", output.Topic))
290+
allErrs = append(allErrs, e)
291+
}
278292
}
279293
for _, v := range allInputTopics {
280294
if v == output.Topic {
@@ -307,49 +321,6 @@ func validateInputOutput(input *v1alpha1.InputConf, output *v1alpha1.OutputConf,
307321
return allErrs
308322
}
309323

310-
func validateKafkaInputOutput(input *v1alpha1.InputConf, output *v1alpha1.OutputConf,
311-
skipInputValidation bool, skipOutputValidation bool) []*field.Error {
312-
var allErrs field.ErrorList
313-
allInputTopics := []string{}
314-
if input != nil {
315-
allInputTopics = collectAllInputTopics(*input)
316-
if !skipInputValidation {
317-
if len(allInputTopics) == 0 {
318-
e := field.Invalid(field.NewPath("spec").Child("input"), *input,
319-
"No input topic(s) specified for the function")
320-
allErrs = append(allErrs, e)
321-
}
322-
323-
for topicName, conf := range input.SourceSpecs {
324-
if conf.ReceiverQueueSize != nil && *conf.ReceiverQueueSize < 0 {
325-
e := field.Invalid(field.NewPath("spec").Child("input", "sourceSpecs"),
326-
input.SourceSpecs, fmt.Sprintf("%s receiver queue size should be >= zero", topicName))
327-
allErrs = append(allErrs, e)
328-
}
329-
330-
if conf.CryptoConfig != nil && conf.CryptoConfig.CryptoKeyReaderClassName == "" {
331-
e := field.Invalid(field.NewPath("spec").Child("input", "sourceSpecs"),
332-
input.SourceSpecs, fmt.Sprintf("%s cryptoKeyReader class name required", topicName))
333-
allErrs = append(allErrs, e)
334-
}
335-
}
336-
}
337-
}
338-
339-
if output != nil && !skipOutputValidation && output.Topic != "" {
340-
for _, v := range allInputTopics {
341-
if v == output.Topic {
342-
e := field.Invalid(field.NewPath("spec").Child("output", "topic"), output.Topic,
343-
fmt.Sprintf("Output topic %s is also being used as an input topic (topics must be one or the other)",
344-
output.Topic))
345-
allErrs = append(allErrs, e)
346-
}
347-
}
348-
}
349-
350-
return allErrs
351-
}
352-
353324
func validateLogTopic(logTopic string) *field.Error {
354325
if logTopic != "" {
355326
err := isValidTopicName(logTopic)
@@ -488,6 +459,10 @@ func validateKafkaMessagingRuntime(runtime v1alpha1.Runtime, kafka *v1alpha1.Kaf
488459
return field.Invalid(field.NewPath("spec").Child("kafka"), kafka,
489460
"only genericRuntime supports kafka messaging")
490461
}
462+
if runtime.GenericRuntime.FunctionFile == "" {
463+
return field.Invalid(field.NewPath("spec").Child("runtime", "genericRuntime", "functionFile"),
464+
runtime.GenericRuntime.FunctionFile, "genericRuntime.functionFile needs to be set for kafka messaging")
465+
}
491466
return nil
492467
}
493468

pkg/webhook/validate_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,15 @@ func TestValidateKafkaMessagingRuntimeRequiresGenericRuntime(t *testing.T) {
8585
}
8686
}
8787

88+
func TestValidateKafkaMessagingRuntimeRequiresGenericRuntimeFunctionFile(t *testing.T) {
89+
err := validateKafkaMessagingRuntime(v1alpha1.Runtime{
90+
GenericRuntime: &v1alpha1.GenericRuntime{},
91+
}, &v1alpha1.KafkaMessaging{BootstrapServers: "kafka:9092"})
92+
if err == nil || !strings.Contains(err.Error(), "genericRuntime.functionFile needs to be set for kafka messaging") {
93+
t.Fatalf("expected kafka genericRuntime function file error, got %v", err)
94+
}
95+
}
96+
8897
func TestValidateKafkaInputOutputAllowsKafkaTopicNames(t *testing.T) {
8998
errs := validateKafkaInputOutput(&v1alpha1.InputConf{
9099
Topics: []string{"orders"},
@@ -95,3 +104,19 @@ func TestValidateKafkaInputOutputAllowsKafkaTopicNames(t *testing.T) {
95104
t.Fatalf("expected kafka topic names to be valid, got %v", errs)
96105
}
97106
}
107+
108+
func TestValidateKafkaInputOutputValidatesOutputProducerCryptoConfig(t *testing.T) {
109+
errs := validateKafkaInputOutput(&v1alpha1.InputConf{
110+
Topics: []string{"orders"},
111+
}, &v1alpha1.OutputConf{
112+
Topic: "enriched-orders",
113+
ProducerConf: &v1alpha1.ProducerConfig{
114+
CryptoConfig: &v1alpha1.CryptoConfig{
115+
EncryptionKeys: []string{"key"},
116+
},
117+
},
118+
}, false, false)
119+
if len(errs) == 0 || !strings.Contains(errs[0].Error(), "cryptoKeyReader class name required") {
120+
t.Fatalf("expected kafka output crypto validation error, got %v", errs)
121+
}
122+
}

0 commit comments

Comments
 (0)