@@ -19,7 +19,6 @@ package spec
1919
2020import (
2121 "encoding/json"
22- "regexp"
2322 "strings"
2423 "testing"
2524
@@ -198,11 +197,9 @@ func TestJavaFunctionCommandWithConnectorOverrides(t *testing.T) {
198197 startCommand := commands [2 ]
199198 assert .Assert (t , strings .Contains (startCommand , "--connectors_directory " + DefaultConnectorsDirectory ),
200199 "start command should include connectors directory but got %s" , startCommand )
201- re := regexp .MustCompile (`--function_details '([^']+)'` )
202- matches := re .FindStringSubmatch (startCommand )
203- assert .Assert (t , len (matches ) == 2 , "unable to locate function details in command: %s" , startCommand )
204-
205- functionDetailsJSON := matches [1 ]
200+ functionDetailsJSON := generateFunctionDetailsInJSON (function )
201+ assert .Assert (t , strings .Contains (startCommand , "--function_details " + shellQuoteLiteral (functionDetailsJSON )),
202+ "start command should include shell quoted function details but got %s" , startCommand )
206203 details := & proto.FunctionDetails {}
207204 err := protojson .Unmarshal ([]byte (functionDetailsJSON ), details )
208205 assert .NilError (t , err )
@@ -232,6 +229,116 @@ func TestJavaFunctionCommandWithConnectorOverrides(t *testing.T) {
232229 assert .Equal (t , producerConfig ["enable.idempotence" ], true )
233230}
234231
232+ func TestSinkCommandShellQuotesFunctionDetails (t * testing.T ) {
233+ replicas := int32 (1 )
234+ trueVal := true
235+ sinkConfig := v1alpha1 .NewConfig (map [string ]interface {}{
236+ "partitionerType" : "time" ,
237+ "timePartitionDuration" : "1m" ,
238+ "timePartitionPattern" : "yyyy-MM-dd/HH'h'-mm'm'" ,
239+ })
240+ sink := & v1alpha1.Sink {
241+ TypeMeta : metav1.TypeMeta {
242+ Kind : "Sink" ,
243+ APIVersion : "compute.functionmesh.io/v1alpha1" ,
244+ },
245+ ObjectMeta : * makeSampleObjectMeta ("time-pattern-sink" ),
246+ Spec : v1alpha1.SinkSpec {
247+ Name : "time-pattern-sink" ,
248+ ClassName : "org.apache.pulsar.io.jcloud.sink.CloudStorageGenericRecordSink" ,
249+ Tenant : "public" ,
250+ Namespace : "default" ,
251+ ClusterName : TestClusterName ,
252+ Input : v1alpha1.InputConf {
253+ Topics : []string {
254+ "persistent://public/default/input" ,
255+ },
256+ TypeClassName : "org.apache.pulsar.client.api.schema.GenericRecord" ,
257+ },
258+ SinkConfig : & sinkConfig ,
259+ Replicas : & replicas ,
260+ AutoAck : & trueVal ,
261+ Messaging : v1alpha1.Messaging {
262+ Pulsar : & v1alpha1.PulsarMessaging {
263+ PulsarConfig : TestClusterName ,
264+ },
265+ },
266+ Runtime : v1alpha1.Runtime {
267+ Java : & v1alpha1.JavaRuntime {
268+ Jar : "connectors/pulsar-io-cloud-storage.nar" ,
269+ JarLocation : "" ,
270+ },
271+ },
272+ Image : "streamnative/pulsar-io-cloud-storage:latest" ,
273+ },
274+ }
275+
276+ commands := MakeSinkCommand (sink )
277+ assert .Assert (t , len (commands ) == 3 , "commands should be 3 but got %d" , len (commands ))
278+
279+ sinkDetailsJSON := generateSinkDetailsInJSON (sink )
280+ assert .Assert (t , strings .Contains (commands [2 ], "--function_details " + shellQuoteLiteral (sinkDetailsJSON )),
281+ "sink command should include shell quoted function details but got %s" , commands [2 ])
282+ assert .Assert (t , strings .Contains (commands [2 ], `'"'"'` ),
283+ "sink command should escape embedded single quotes but got %s" , commands [2 ])
284+ }
285+
286+ func TestSinkCommandShellQuotesClientAuthParams (t * testing.T ) {
287+ replicas := int32 (1 )
288+ trueVal := true
289+ authParams := `{"token":"a'b"}`
290+ sink := & v1alpha1.Sink {
291+ TypeMeta : metav1.TypeMeta {
292+ Kind : "Sink" ,
293+ APIVersion : "compute.functionmesh.io/v1alpha1" ,
294+ },
295+ ObjectMeta : * makeSampleObjectMeta ("auth-config-sink" ),
296+ Spec : v1alpha1.SinkSpec {
297+ Name : "auth-config-sink" ,
298+ ClassName : "org.apache.pulsar.io.elasticsearch.ElasticSearchSink" ,
299+ Tenant : "public" ,
300+ Namespace : "default" ,
301+ ClusterName : TestClusterName ,
302+ Input : v1alpha1.InputConf {
303+ Topics : []string {
304+ "persistent://public/default/input" ,
305+ },
306+ TypeClassName : "[B" ,
307+ },
308+ SinkConfig : & v1alpha1.Config {
309+ Data : map [string ]interface {}{
310+ "elasticSearchUrl" : "http://localhost:9200" ,
311+ },
312+ },
313+ Replicas : & replicas ,
314+ AutoAck : & trueVal ,
315+ Messaging : v1alpha1.Messaging {
316+ Pulsar : & v1alpha1.PulsarMessaging {
317+ PulsarConfig : TestClusterName ,
318+ AuthConfig : & v1alpha1.AuthConfig {
319+ GenericAuth : & v1alpha1.GenericAuth {
320+ ClientAuthenticationPlugin : "auth-plugin" ,
321+ ClientAuthenticationParameters : authParams ,
322+ },
323+ },
324+ },
325+ },
326+ Runtime : v1alpha1.Runtime {
327+ Java : & v1alpha1.JavaRuntime {
328+ Jar : "connectors/pulsar-io-elastic-search.nar" ,
329+ JarLocation : "" ,
330+ },
331+ },
332+ Image : "streamnative/pulsar-io-elastic-search:latest" ,
333+ },
334+ }
335+
336+ commands := MakeSinkCommand (sink )
337+ assert .Assert (t , len (commands ) == 3 , "commands should be 3 but got %d" , len (commands ))
338+ assert .Assert (t , strings .Contains (commands [2 ], "--client_auth_params " + shellQuoteLiteral (authParams )),
339+ "sink command should shell quote client auth params but got %s" , commands [2 ])
340+ }
341+
235342func TestFunctionPulsarPackageServiceDownloadCommandAndPodWiring (t * testing.T ) {
236343 previous := utils .EnableInitContainers
237344 defer func () {
0 commit comments