@@ -82,10 +82,6 @@ function ci::install_pulsar_charts() {
8282 echo " Installing the pulsar charts ..."
8383 values=${1:- " .ci/clusters/values.yaml" }
8484 echo $values
85- if [ " $values " = " .ci/clusters/values_mesh_worker_service.yaml" ]; then
86- echo " load mesh-worker-service-integration-pulsar:latest ..."
87- kind load docker-image mesh-worker-service-integration-pulsar:latest --name sn-platform-${CLUSTER_ID}
88- fi
8985 if [ -d " pulsar-charts" ]; then
9086 rm -rf pulsar-charts
9187 fi
@@ -254,224 +250,3 @@ function ci:verify_exclamation_function() {
254250 fi
255251 return 1
256252}
257-
258- function ci::ensure_mesh_worker_service_role() {
259- ${KUBECTL} apply -f ${FUNCTION_MESH_HOME} /config/rbac/role.yaml
260- ${KUBECTL} create clusterrolebinding broker-acct-manager-role-binding --clusterrole=manager-role --serviceaccount=default:sn-platform-pulsar-broker-acct
261- }
262-
263- function ci::ensure_function_mesh_config() {
264- ${KUBECTL} apply -f ${FUNCTION_MESH_HOME} /.ci/clusters/mesh_worker_service_integration_test_pulsar_config.yaml
265- }
266-
267- function ci::verify_mesh_worker_service_pulsar_admin() {
268- ${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER} -pulsar-broker-0 -- bin/pulsar-admin sinks available-sinks
269- RET=$( ${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER} -pulsar-broker-0 -- bin/pulsar-admin sinks available-sinks)
270- if [[ $RET != * " data-generator" * ]]; then
271- return 1
272- fi
273- ${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER} -pulsar-broker-0 -- bin/pulsar-admin sources available-sources
274- RET=$( ${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER} -pulsar-broker-0 -- bin/pulsar-admin sources available-sources)
275- if [[ $RET != * " data-generator" * ]]; then
276- return 1
277- fi
278- RET=$( ${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER} -pulsar-broker-0 -- bin/pulsar-admin sources create --name data-generator-source --source-type data-generator --destination-topic-name persistent://public/default/random-data-topic --custom-runtime-options ' {"outputTypeClassName": "org.apache.pulsar.io.datagenerator.Person"}' --source-config ' {"sleepBetweenMessages": "1000"}' )
279- echo $RET
280- if [[ $RET != * " successfully" * ]]; then
281- return 1
282- fi
283- RET=$( ${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER} -pulsar-broker-0 -- bin/pulsar-admin sinks create --name data-generator-sink --sink-type data-generator --inputs persistent://public/default/random-data-topic --custom-runtime-options ' {"inputTypeClassName": "org.apache.pulsar.io.datagenerator.Person"}' )
284- echo $RET
285- if [[ $RET != * " successfully" * ]]; then
286- return 1
287- fi
288- ${KUBECTL} get pods -n ${NAMESPACE}
289- sleep 120
290- ${KUBECTL} get pods -n ${NAMESPACE} --field-selector=status.phase=Running | grep " data-generator-source"
291- WC=$( ${KUBECTL} get pods -n ${NAMESPACE} --field-selector=status.phase=Running | grep " data-generator-sink" | wc -l)
292- while [[ ${WC} -lt 1 ]]; do
293- echo ${WC} ;
294- sleep 20
295- ${KUBECTL} get pods -n ${NAMESPACE}
296- WC=$( ${KUBECTL} get pods -n ${NAMESPACE} --field-selector=status.phase=Running | grep " data-generator-sink" | wc -l)
297- done
298- ${KUBECTL} get pods -n ${NAMESPACE} --field-selector=status.phase=Running | grep " data-generator-source"
299- WC=$( ${KUBECTL} get pods -n ${NAMESPACE} --field-selector=status.phase=Running | grep " data-generator-source" | wc -l)
300- while [[ ${WC} -lt 1 ]]; do
301- echo ${WC} ;
302- sleep 20
303- ${KUBECTL} get pods -n ${NAMESPACE}
304- WC=$( ${KUBECTL} get pods -n ${NAMESPACE} --field-selector=status.phase=Running | grep " data-generator-source" | wc -l)
305- done
306- sleep 120
307- ${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER} -pulsar-broker-0 -- bin/pulsar-admin sinks status --name data-generator-sink
308- RET=$( ${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER} -pulsar-broker-0 -- bin/pulsar-admin sinks status --name data-generator-sink)
309- if [[ $RET != * " true" * ]]; then
310- return 1
311- fi
312- ${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER} -pulsar-broker-0 -- bin/pulsar-admin sources status --name data-generator-source
313- RET=$( ${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER} -pulsar-broker-0 -- bin/pulsar-admin sources status --name data-generator-source)
314- if [[ $RET != * " true" * ]]; then
315- return 1
316- fi
317- ${KUBECTL} get pods -n ${NAMESPACE}
318- RET=$( ${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER} -pulsar-broker-0 -- bin/pulsar-admin sources delete --name data-generator-source)
319- echo $RET
320- RET=$( ${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER} -pulsar-broker-0 -- bin/pulsar-admin sinks delete --name data-generator-sink)
321- echo $RET
322- ${KUBECTL} get pods -n ${NAMESPACE}
323- echo " === verify mesh worker service with empty connector config"
324- RET=$( ${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER} -pulsar-broker-0 -- bin/pulsar-admin sources create --name data-generator-source --source-type data-generator --destination-topic-name persistent://public/default/random-data-topic --custom-runtime-options ' {"outputTypeClassName": "org.apache.pulsar.io.datagenerator.Person"}' )
325- echo $RET
326- if [[ $RET != * " successfully" * ]]; then
327- return 1
328- fi
329- WC=$( ${KUBECTL} get pods -n ${NAMESPACE} --field-selector=status.phase=Running | grep " data-generator-source" | wc -l)
330- while [[ ${WC} -lt 1 ]]; do
331- echo ${WC} ;
332- sleep 20
333- ${KUBECTL} get pods -n ${NAMESPACE}
334- WC=$( ${KUBECTL} get pods -n ${NAMESPACE} --field-selector=status.phase=Running | grep " data-generator-source" | wc -l)
335- done
336- ${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER} -pulsar-broker-0 -- bin/pulsar-admin sources status --name data-generator-source
337- RET=$( ${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER} -pulsar-broker-0 -- bin/pulsar-admin sources status --name data-generator-source)
338- if [[ $RET != * " true" * ]]; then
339- ${KUBECTL} logs -n ${NAMESPACE} data-generator-source-69865103-source-0
340- ${KUBECTL} get pods data-generator-source-69865103-source-0 -o yaml
341- return 1
342- fi
343- ${KUBECTL} get pods -n ${NAMESPACE}
344- RET=$( ${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER} -pulsar-broker-0 -- bin/pulsar-admin sources delete --name data-generator-source)
345- echo $RET
346- ${KUBECTL} get pods -n ${NAMESPACE}
347-
348- }
349-
350- function ci::upload_java_package() {
351-
352- RET=$( ${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER} -pulsar-broker-0 -- bin/pulsar-admin packages upload function://public/default/java-function@1.0 --path /pulsar/examples/api-examples.jar --description java-function@1.0)
353- if [[ $RET != * " successfully" * ]]; then
354- echo " ${RET} "
355- ${KUBECTL} logs -n ${NAMESPACE} ${CLUSTER} -pulsar-broker-0
356- sleep 60
357- return 1
358- fi
359- RET=$( ${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER} -pulsar-broker-0 -- bin/pulsar-admin packages download function://public/default/java-function@1.0 --path /pulsar/api-examples.jar)
360- if [[ $RET != * " successfully" * ]]; then
361- echo " ${RET} "
362- ${KUBECTL} logs -n ${NAMESPACE} ${CLUSTER} -pulsar-broker-0
363- sleep 60
364- return 1
365- fi
366- }
367-
368- function ci::verify_java_package() {
369- RET=$( ${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER} -pulsar-broker-0 -- bin/pulsar-admin functions create --jar function://public/default/java-function@1.0 --name package-java-fn --className org.apache.pulsar.functions.api.examples.ExclamationFunction --inputs persistent://public/default/package-java-fn-input --cpu 0.1)
370- ${KUBECTL} logs -n ${NAMESPACE} ${CLUSTER} -pulsar-broker-0
371- sleep 15
372- echo $RET
373- ${KUBECTL} logs -n ${NAMESPACE} ${CLUSTER} -pulsar-broker-0
374- sleep 15
375- ${KUBECTL} get pods -A
376- sleep 5
377- WC=$( ${KUBECTL} get pods -n ${NAMESPACE} --field-selector=status.phase=Running | grep " package-java-fn" | wc -l)
378- while [[ ${WC} -lt 1 ]]; do
379- echo ${WC} ;
380- sleep 20
381- ${KUBECTL} get pods -n ${NAMESPACE}
382- ${KUBECTL} describe pod package-java-fn-function-0
383- WC=$( ${KUBECTL} get pods -n ${NAMESPACE} --field-selector=status.phase=Running | grep " package-java-fn" | wc -l)
384- done
385- echo " java function test done"
386- RET=$( ${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER} -pulsar-broker-0 -- bin/pulsar-admin functions delete --name package-java-fn)
387- echo " ${RET} "
388- }
389-
390- function ci::upload_python_package() {
391- RET=$( ${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER} -pulsar-broker-0 -- bin/pulsar-admin packages upload function://public/default/python-function@1.0 --path /pulsar/examples/python-examples/exclamation_function.py --description python-function@1.0)
392- if [[ $RET != * " successfully" * ]]; then
393- echo " ${RET} "
394- return 1
395- fi
396- }
397-
398- function ci::verify_python_package() {
399- RET=$( ${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER} -pulsar-broker-0 -- bin/pulsar-admin functions create --py function://public/default/python-function@1.0 --name package-python-fn --classname exclamation_function.ExclamationFunction --inputs persistent://public/default/package-python-fn-input --cpu 0.1)
400- if [[ $RET != * " successfully" * ]]; then
401- echo " ${RET} "
402- return 1
403- fi
404- sleep 15
405- ${KUBECTL} get pods -A
406- sleep 5
407- WC=$( ${KUBECTL} get pods -n ${NAMESPACE} --field-selector=status.phase=Running | grep " package-python-fn" | wc -l)
408- while [[ ${WC} -lt 1 ]]; do
409- echo ${WC} ;
410- sleep 20
411- ${KUBECTL} get pods -n ${NAMESPACE}
412- ${KUBECTL} describe pod package-python-fn-function-0
413- WC=$( ${KUBECTL} get pods -n ${NAMESPACE} --field-selector=status.phase=Running | grep " package-python-fn" | wc -l)
414- done
415- echo " python function test done"
416- RET=$( ${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER} -pulsar-broker-0 -- bin/pulsar-admin functions delete --name package-python-fn)
417- echo " ${RET} "
418- }
419-
420- function ci::upload_go_package() {
421- ${KUBECTL} cp " ${FUNCTION_MESH_HOME} /.ci/examples/go-examples" " ${NAMESPACE} /${CLUSTER} -pulsar-broker-0:/pulsar/examples"
422- sleep 1
423- ${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER} -pulsar-broker-0 -- ls -l /pulsar/examples/go-examples
424- sleep 1
425- RET=$( ${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER} -pulsar-broker-0 -- bin/pulsar-admin packages upload function://public/default/go-function@1.0 --path /pulsar/examples/go-examples/exclamationFunc --description go-function@1.0)
426- if [[ $RET != * " successfully" * ]]; then
427- echo " ${RET} "
428- return 1
429- fi
430- }
431-
432- function ci::verify_go_package() {
433- RET=$( ${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER} -pulsar-broker-0 -- bin/pulsar-admin functions create --go function://public/default/go-function@1.0 --name package-go-fn --inputs persistent://public/default/package-go-fn-input --cpu 0.1)
434- if [[ $RET != * " successfully" * ]]; then
435- echo " ${RET} "
436- return 1
437- fi
438- sleep 15
439- ${KUBECTL} get pods -A
440- sleep 5
441- WC=$( ${KUBECTL} get pods -n ${NAMESPACE} --field-selector=status.phase=Running | grep " package-go-fn" | wc -l)
442- while [[ ${WC} -lt 1 ]]; do
443- echo ${WC} ;
444- sleep 20
445- ${KUBECTL} get pods -n ${NAMESPACE}
446- ${KUBECTL} describe pod package-go-fn-function-0
447- WC=$( ${KUBECTL} get pods -n ${NAMESPACE} --field-selector=status.phase=Running | grep " package-go-fn" | wc -l)
448- done
449- echo " go function test done"
450- RET=$( ${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER} -pulsar-broker-0 -- bin/pulsar-admin functions delete --name package-go-fn)
451- echo " ${RET} "
452- }
453-
454- function ci::create_java_function_by_upload() {
455- ${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER} -pulsar-broker-0 -- cat conf/functions_worker.yml
456- RET=$( ${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER} -pulsar-broker-0 -- bin/pulsar-admin functions create --jar /pulsar/examples/api-examples.jar --name package-upload-java-fn --className org.apache.pulsar.functions.api.examples.ExclamationFunction --inputs persistent://public/default/package-upload-java-fn-input --cpu 0.1)
457- ${KUBECTL} logs -n ${NAMESPACE} ${CLUSTER} -pulsar-broker-0
458- sleep 15
459- echo $RET
460- ${KUBECTL} logs -n ${NAMESPACE} ${CLUSTER} -pulsar-broker-0
461- sleep 15
462- ${KUBECTL} get pods -A
463- sleep 5
464- WC=$( ${KUBECTL} get pods -n ${NAMESPACE} --field-selector=status.phase=Running | grep " package-upload-java-fn" | wc -l)
465- while [[ ${WC} -lt 1 ]]; do
466- echo ${WC} ;
467- sleep 20
468- ${KUBECTL} get pods -n ${NAMESPACE}
469- ${KUBECTL} describe pod package-upload-java-fn-function-0
470- WC=$( ${KUBECTL} get pods -n ${NAMESPACE} --field-selector=status.phase=Running | grep " package-upload-java-fn" | wc -l)
471- done
472- echo " java function test done"
473- RET=$( ${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER} -pulsar-broker-0 -- bin/pulsar-admin functions delete --name package-upload-java-fn)
474- echo " ${RET} "
475- RET=$( ${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER} -pulsar-broker-0 -- bin/pulsar-admin packages get-metadata function://public/default/package-upload-java-fn@latest)
476- echo " ${RET} "
477- }
0 commit comments