@@ -33,6 +33,22 @@ if [ ! "$KUBECONFIG" ]; then
3333fi
3434
3535manifests_file=" ${BASE_DIR} " /.ci/tests/integration/cases/logging-window-function/manifests.yaml
36+ input_topic=" persistent://public/default/window-function-input-topic"
37+ output_topic=" persistent://public/default/window-function-output-topic"
38+ log_topic=" persistent://public/default/window-function-logs"
39+
40+ function delete_topic_if_exists() {
41+ topic=$1
42+ kubectl exec -n " ${PULSAR_NAMESPACE} " " ${PULSAR_RELEASE_NAME} " -pulsar-broker-0 -- bin/pulsar-admin topics delete " ${topic} " -f > /dev/null 2>&1 || true
43+ kubectl exec -n " ${PULSAR_NAMESPACE} " " ${PULSAR_RELEASE_NAME} " -pulsar-broker-0 -- bin/pulsar-admin topics delete-partitioned-topic " ${topic} " -f > /dev/null 2>&1 || true
44+ kubectl exec -n " ${PULSAR_NAMESPACE} " " ${PULSAR_RELEASE_NAME} " -pulsar-broker-0 -- bin/pulsar-admin topics delete " ${topic} -partition-0" -f > /dev/null 2>&1 || true
45+ }
46+
47+ kubectl delete -f " ${manifests_file} " > /dev/null 2>&1 || true
48+ kubectl wait -l compute.functionmesh.io/name=window-function-sample --for=delete pod --timeout=2m > /dev/null 2>&1 || true
49+ delete_topic_if_exists " ${input_topic} "
50+ delete_topic_if_exists " ${output_topic} "
51+ delete_topic_if_exists " ${log_topic} "
3652
3753kubectl apply -f " ${manifests_file} " > /dev/null 2>&1
3854
@@ -51,7 +67,7 @@ if [ $? -ne 0 ]; then
5167 exit 1
5268fi
5369
54- verify_java_result=$( NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::send_test_data " persistent://public/default/window-function-input-topic " " test-message" 3 2>&1 )
70+ verify_java_result=$( NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::send_test_data " ${input_topic} " " test-message" 3 2>&1 )
5571if [ $? -ne 0 ]; then
5672 echo " $verify_java_result "
5773 kubectl delete -f " ${manifests_file} " > /dev/null 2>&1 || true
6177sleep 3
6278
6379# the 3 messages will not be processed, so backlog should be 3
64- verify_backlog_result=$( NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::verify_backlog " persistent://public/default/window-function-input-topic " " public/default/window-function-sample" 3 2>&1 )
80+ verify_backlog_result=$( NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::verify_backlog " ${input_topic} " " public/default/window-function-sample" 3 2>&1 )
6581if [ $? -ne 0 ]; then
6682 echo " $verify_backlog_result "
6783 kubectl delete -f " ${manifests_file} " > /dev/null 2>&1 || true
6884 exit 1
6985fi
7086
7187# it will fire the window with first 5 messages when get the 5th message, and then fire again with 10 messages when get 10th message
72- verify_java_result=$( NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::send_test_data " persistent://public/default/window-function-input-topic " " test-message" 7 2>&1 )
88+ verify_java_result=$( NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::send_test_data " ${input_topic} " " test-message" 7 2>&1 )
7389if [ $? -ne 0 ]; then
7490 echo " $verify_java_result "
7591 kubectl delete -f " ${manifests_file} " > /dev/null 2>&1 || true
87103# exit 1
88104# fi
89105
90- verify_log_result=$( kubectl logs -l compute.functionmesh.io/name=window-function-sample --tail=-1 | grep -e " -window-log" | wc -l)
91- if [ $verify_log_result -eq 15 ]; then
106+ verify_log_result=0
107+ for attempt in $( seq 1 30) ; do
108+ verify_log_result=$( kubectl logs -l compute.functionmesh.io/name=window-function-sample --tail=-1 | grep -e " -window-log" | wc -l)
109+ if [ " $verify_log_result " -eq 15 ]; then
110+ break
111+ fi
112+ sleep 2
113+ done
114+
115+ if [ " $verify_log_result " -eq 15 ]; then
92116 sub_name=$( echo $RANDOM | md5sum | head -c 20; echo ; )
93- verify_log_topic_result=$( kubectl exec -n ${PULSAR_NAMESPACE} ${PULSAR_RELEASE_NAME} -pulsar-broker-0 -- bin/pulsar-client consume -n 15 -s $sub_name --subscription-position Earliest " persistent://public/default/window-function-logs " | grep -e " -window-log" | wc -l)
94- if [ $verify_log_topic_result -ne 0 ]; then
117+ verify_log_topic_result=$( kubectl exec -n ${PULSAR_NAMESPACE} ${PULSAR_RELEASE_NAME} -pulsar-broker-0 -- bin/pulsar-client consume -n 15 -s $sub_name --subscription-position Earliest " ${log_topic} " | grep -e " -window-log" | wc -l)
118+ if [ " $verify_log_topic_result " -eq 15 ]; then
95119 echo " e2e-test: ok" | yq eval -
96120 else
97- echo " $verify_log_topic_result "
121+ echo " expected 15 window log topic messages, got ${verify_log_topic_result} " >&2
122+ kubectl delete -f " ${manifests_file} " > /dev/null 2>&1 || true
123+ exit 1
98124 fi
99125else
100- echo " $verify_log_result "
126+ echo " expected 15 window log lines, got ${verify_log_result} " >&2
127+ kubectl delete -f " ${manifests_file} " > /dev/null 2>&1 || true
128+ exit 1
101129fi
102130
103131kubectl delete -f " ${manifests_file} " > /dev/null 2>&1 || true
0 commit comments