diff --git a/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_plugin.go b/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_plugin.go index af3e5d1ca7..b6ea44d351 100644 --- a/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_plugin.go +++ b/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_plugin.go @@ -179,7 +179,7 @@ func (c *PluginJobCtl) complete(ctx context.Context) { }() // get job outputs info from pod terminate message. - if err := getJobOutputFromTerminalMsg(c.jobTaskSpec.Properties.Namespace, c.job.Name, c.job, c.workflowCtx, c.kubeclient); err != nil { + if err := getJobOutputFromTerminalMsg(c.jobTaskSpec.Properties.Namespace, c.job, c.workflowCtx, c.kubeclient); err != nil { c.logger.Error(err) c.job.Error = err.Error() } diff --git a/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/kubernetes.go b/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/kubernetes.go index c9f215b92b..1508ee4dda 100644 --- a/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/kubernetes.go +++ b/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/kubernetes.go @@ -78,6 +78,7 @@ const ( defaultSecretEmail = "bot@koderover.com" registrySecretSuffix = "-registry-secret" workflowConfigMapRoleSA = "workflow-cm-sa" + outputCollectorContainerName = "job-output-collector" defaultRetryCount = 3 defaultRetryInterval = time.Second * 3 @@ -215,31 +216,63 @@ func getBaseImage(buildOS, imageFrom string) string { } func buildPlainJob(jobName string, resReq setting.Request, resReqSpec setting.RequestSpec, jobTask *commonmodels.JobTask, jobTaskSpec *commonmodels.JobTaskPluginSpec, workflowCtx *commonmodels.WorkflowTaskCtx, customLabels, customAnnotations map[string]string) (*batchv1.Job, error) { - collectJobOutput := `OLD_IFS=$IFS -export IFS="," -files='%s' -outputs='%s' -file_arr=($files) -output_arr=($outputs) -IFS="$OLD_IFS" -result="{" -for i in ${!file_arr[@]}; -do - file_value=$(cat ${file_arr[$i]}) - output_value=${output_arr[$i]} - result="$result\"$output_value\":\"$file_value\"," + collectJobOutput := `output_pairs='%s' + +json_escape() { + printf '%%s' "$1" | sed ':a;N;$!ba;s/\\/\\\\/g;s/"/\\"/g;s/\n/\\n/g' +} + +result="[" +old_ifs=$IFS +IFS=',' +for output_pair in $output_pairs; do + name=${output_pair%%%%::*} + file=${output_pair#*::} + value="" + [ -f "$file" ] && value=$(cat "$file") + result="${result}{\"name\":\"$(json_escape "$name")\",\"value\":\"$(json_escape "$value")\"}," done -result=$(sed 's/,$/}/' <<< $result) -echo $result > %s -` - files := []string{} - outputs := []string{} +IFS=$old_ifs + +[ "$result" = "[" ] && result="[]" || result="${result%%,}]" +printf '%%s' "$result" > %s + ` + outputFilePairs := []string{} for _, output := range jobTask.Outputs { outputFile := path.Join(job.JobOutputDir, output.Name) - files = append(files, outputFile) - outputs = append(outputs, output.Name) - } - collectJobOutputCommand := fmt.Sprintf(collectJobOutput, strings.Join(files, ","), strings.Join(outputs, ","), job.JobTerminationFile) + outputFilePairs = append(outputFilePairs, fmt.Sprintf("%s::%s", output.Name, outputFile)) + } + collectJobOutputFromSidecar := fmt.Sprintf(collectJobOutput, strings.Join(outputFilePairs, ","), job.JobTerminationFile) + collectorScript := `output_pairs='%s' +max_wait=%d +output_file=%s + +if [ -z "$output_pairs" ]; then + printf '[]' > "$output_file" + exit 0 +fi + +all_exist() { + old_ifs=$IFS + IFS=',' + for output_pair in $output_pairs; do + file=${output_pair#*::} + [ -f "$file" ] || { IFS=$old_ifs; return 1; } + done + IFS=$old_ifs + return 0 +} + +for _ in $(seq 1 "$max_wait"); do + all_exist && break + sleep 1 +done + +# Give producers a short window to finish file writes. +sleep 1 + +%s` + collectorScript = fmt.Sprintf(collectorScript, strings.Join(outputFilePairs, ","), jobTaskSpec.Properties.Timeout*60+120, job.JobTerminationFile, collectJobOutputFromSidecar) labels := getJobLabelsWithCustomizeData(&JobLabel{ JobType: string(jobTask.JobType), @@ -295,14 +328,7 @@ echo $result > %s Image: jobTaskSpec.Plugin.Image, Args: jobTaskSpec.Plugin.Args, Command: jobTaskSpec.Plugin.Cmds, - Lifecycle: &corev1.Lifecycle{ - PreStop: &corev1.LifecycleHandler{ - Exec: &corev1.ExecAction{ - Command: []string{"/bin/sh", "-c", collectJobOutputCommand}, - }, - }, - }, - Env: envs, + Env: envs, VolumeMounts: []corev1.VolumeMount{ { Name: "zadig-context", @@ -321,6 +347,21 @@ echo $result > %s TerminationMessagePolicy: corev1.TerminationMessageReadFile, TerminationMessagePath: job.JobTerminationFile, }, + { + Name: outputCollectorContainerName, + Image: BusyBoxImage, + ImagePullPolicy: util.ToPullPolicy(configbase.ImagePullPolicy()), + Command: []string{"/bin/sh", "-c"}, + Args: []string{collectorScript}, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "zadig-output", + MountPath: job.JobOutputDir, + }, + }, + TerminationMessagePolicy: corev1.TerminationMessageReadFile, + TerminationMessagePath: job.JobTerminationFile, + }, }, Volumes: []corev1.Volume{ { @@ -1042,27 +1083,27 @@ func waitJobEndByCheckingConfigMap(ctx context.Context, taskTimeout <-chan time. } } -func getJobOutputFromTerminalMsg(namespace, containerName string, jobTask *commonmodels.JobTask, workflowCtx *commonmodels.WorkflowTaskCtx, kubeClient crClient.Client) error { +func getJobOutputFromTerminalMsg(namespace string, jobTask *commonmodels.JobTask, workflowCtx *commonmodels.WorkflowTaskCtx, kubeClient crClient.Client) error { jobLabel := &JobLabel{ JobType: string(jobTask.JobType), JobName: jobTask.K8sJobName, } + outputs := []*job.JobOutput{} ls := getJobLabels(jobLabel) pods, err := getter.ListPods(namespace, labels.Set(ls).AsSelector(), kubeClient) if err != nil { return err } + for _, pod := range pods { ipod := wrapper.Pod(pod) // only collect succeeed job outputs. if !ipod.Succeeded() { return nil } + for _, containerStatus := range pod.Status.ContainerStatuses { - if containerStatus.Name != containerName { - continue - } if containerStatus.State.Terminated != nil && len(containerStatus.State.Terminated.Message) != 0 { if err := json.Unmarshal([]byte(containerStatus.State.Terminated.Message), &outputs); err != nil { return err @@ -1070,6 +1111,7 @@ func getJobOutputFromTerminalMsg(namespace, containerName string, jobTask *commo } } } + writeOutputs(outputs, jobTask.Key, workflowCtx) return nil } diff --git a/pkg/microservice/aslan/core/workflow/service/workflow/controller/job/job_plugin.go b/pkg/microservice/aslan/core/workflow/service/workflow/controller/job/job_plugin.go index 5efc574970..119bd493bf 100644 --- a/pkg/microservice/aslan/core/workflow/service/workflow/controller/job/job_plugin.go +++ b/pkg/microservice/aslan/core/workflow/service/workflow/controller/job/job_plugin.go @@ -167,6 +167,17 @@ func (j PluginJobController) GetVariableList(jobName string, getAggregatedVariab Type: "string", IsCredential: false, }) + + for _, output := range j.jobSpec.Plugin.Outputs { + if getServiceSpecificVariables { + resp = append(resp, &commonmodels.KeyVal{ + Key: strings.Join([]string{"job", j.name, "output", output.Name}, "."), + Value: "", + Type: "string", + IsCredential: false, + }) + } + } } return resp, nil }