Skip to content

Commit 03e349f

Browse files
authored
fix output variables for plugin job (#4528)
Signed-off-by: Patrick Zhao <zhaoyu@koderover.com>
1 parent efad106 commit 03e349f

File tree

3 files changed

+88
-35
lines changed

3 files changed

+88
-35
lines changed

pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/job_plugin.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ func (c *PluginJobCtl) complete(ctx context.Context) {
179179
}()
180180

181181
// get job outputs info from pod terminate message.
182-
if err := getJobOutputFromTerminalMsg(c.jobTaskSpec.Properties.Namespace, c.job.Name, c.job, c.workflowCtx, c.kubeclient); err != nil {
182+
if err := getJobOutputFromTerminalMsg(c.jobTaskSpec.Properties.Namespace, c.job, c.workflowCtx, c.kubeclient); err != nil {
183183
c.logger.Error(err)
184184
c.job.Error = err.Error()
185185
}

pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/kubernetes.go

Lines changed: 76 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ const (
7878
defaultSecretEmail = "bot@koderover.com"
7979
registrySecretSuffix = "-registry-secret"
8080
workflowConfigMapRoleSA = "workflow-cm-sa"
81+
outputCollectorContainerName = "job-output-collector"
8182

8283
defaultRetryCount = 3
8384
defaultRetryInterval = time.Second * 3
@@ -215,31 +216,63 @@ func getBaseImage(buildOS, imageFrom string) string {
215216
}
216217

217218
func buildPlainJob(jobName string, resReq setting.Request, resReqSpec setting.RequestSpec, jobTask *commonmodels.JobTask, jobTaskSpec *commonmodels.JobTaskPluginSpec, workflowCtx *commonmodels.WorkflowTaskCtx, customLabels, customAnnotations map[string]string) (*batchv1.Job, error) {
218-
collectJobOutput := `OLD_IFS=$IFS
219-
export IFS=","
220-
files='%s'
221-
outputs='%s'
222-
file_arr=($files)
223-
output_arr=($outputs)
224-
IFS="$OLD_IFS"
225-
result="{"
226-
for i in ${!file_arr[@]};
227-
do
228-
file_value=$(cat ${file_arr[$i]})
229-
output_value=${output_arr[$i]}
230-
result="$result\"$output_value\":\"$file_value\","
219+
collectJobOutput := `output_pairs='%s'
220+
221+
json_escape() {
222+
printf '%%s' "$1" | sed ':a;N;$!ba;s/\\/\\\\/g;s/"/\\"/g;s/\n/\\n/g'
223+
}
224+
225+
result="["
226+
old_ifs=$IFS
227+
IFS=','
228+
for output_pair in $output_pairs; do
229+
name=${output_pair%%%%::*}
230+
file=${output_pair#*::}
231+
value=""
232+
[ -f "$file" ] && value=$(cat "$file")
233+
result="${result}{\"name\":\"$(json_escape "$name")\",\"value\":\"$(json_escape "$value")\"},"
231234
done
232-
result=$(sed 's/,$/}/' <<< $result)
233-
echo $result > %s
234-
`
235-
files := []string{}
236-
outputs := []string{}
235+
IFS=$old_ifs
236+
237+
[ "$result" = "[" ] && result="[]" || result="${result%%,}]"
238+
printf '%%s' "$result" > %s
239+
`
240+
outputFilePairs := []string{}
237241
for _, output := range jobTask.Outputs {
238242
outputFile := path.Join(job.JobOutputDir, output.Name)
239-
files = append(files, outputFile)
240-
outputs = append(outputs, output.Name)
241-
}
242-
collectJobOutputCommand := fmt.Sprintf(collectJobOutput, strings.Join(files, ","), strings.Join(outputs, ","), job.JobTerminationFile)
243+
outputFilePairs = append(outputFilePairs, fmt.Sprintf("%s::%s", output.Name, outputFile))
244+
}
245+
collectJobOutputFromSidecar := fmt.Sprintf(collectJobOutput, strings.Join(outputFilePairs, ","), job.JobTerminationFile)
246+
collectorScript := `output_pairs='%s'
247+
max_wait=%d
248+
output_file=%s
249+
250+
if [ -z "$output_pairs" ]; then
251+
printf '[]' > "$output_file"
252+
exit 0
253+
fi
254+
255+
all_exist() {
256+
old_ifs=$IFS
257+
IFS=','
258+
for output_pair in $output_pairs; do
259+
file=${output_pair#*::}
260+
[ -f "$file" ] || { IFS=$old_ifs; return 1; }
261+
done
262+
IFS=$old_ifs
263+
return 0
264+
}
265+
266+
for _ in $(seq 1 "$max_wait"); do
267+
all_exist && break
268+
sleep 1
269+
done
270+
271+
# Give producers a short window to finish file writes.
272+
sleep 1
273+
274+
%s`
275+
collectorScript = fmt.Sprintf(collectorScript, strings.Join(outputFilePairs, ","), jobTaskSpec.Properties.Timeout*60+120, job.JobTerminationFile, collectJobOutputFromSidecar)
243276

244277
labels := getJobLabelsWithCustomizeData(&JobLabel{
245278
JobType: string(jobTask.JobType),
@@ -295,14 +328,7 @@ echo $result > %s
295328
Image: jobTaskSpec.Plugin.Image,
296329
Args: jobTaskSpec.Plugin.Args,
297330
Command: jobTaskSpec.Plugin.Cmds,
298-
Lifecycle: &corev1.Lifecycle{
299-
PreStop: &corev1.LifecycleHandler{
300-
Exec: &corev1.ExecAction{
301-
Command: []string{"/bin/sh", "-c", collectJobOutputCommand},
302-
},
303-
},
304-
},
305-
Env: envs,
331+
Env: envs,
306332
VolumeMounts: []corev1.VolumeMount{
307333
{
308334
Name: "zadig-context",
@@ -321,6 +347,21 @@ echo $result > %s
321347
TerminationMessagePolicy: corev1.TerminationMessageReadFile,
322348
TerminationMessagePath: job.JobTerminationFile,
323349
},
350+
{
351+
Name: outputCollectorContainerName,
352+
Image: BusyBoxImage,
353+
ImagePullPolicy: util.ToPullPolicy(configbase.ImagePullPolicy()),
354+
Command: []string{"/bin/sh", "-c"},
355+
Args: []string{collectorScript},
356+
VolumeMounts: []corev1.VolumeMount{
357+
{
358+
Name: "zadig-output",
359+
MountPath: job.JobOutputDir,
360+
},
361+
},
362+
TerminationMessagePolicy: corev1.TerminationMessageReadFile,
363+
TerminationMessagePath: job.JobTerminationFile,
364+
},
324365
},
325366
Volumes: []corev1.Volume{
326367
{
@@ -1042,34 +1083,35 @@ func waitJobEndByCheckingConfigMap(ctx context.Context, taskTimeout <-chan time.
10421083
}
10431084
}
10441085

1045-
func getJobOutputFromTerminalMsg(namespace, containerName string, jobTask *commonmodels.JobTask, workflowCtx *commonmodels.WorkflowTaskCtx, kubeClient crClient.Client) error {
1086+
func getJobOutputFromTerminalMsg(namespace string, jobTask *commonmodels.JobTask, workflowCtx *commonmodels.WorkflowTaskCtx, kubeClient crClient.Client) error {
10461087
jobLabel := &JobLabel{
10471088
JobType: string(jobTask.JobType),
10481089
JobName: jobTask.K8sJobName,
10491090
}
1091+
10501092
outputs := []*job.JobOutput{}
10511093
ls := getJobLabels(jobLabel)
10521094
pods, err := getter.ListPods(namespace, labels.Set(ls).AsSelector(), kubeClient)
10531095
if err != nil {
10541096
return err
10551097
}
1098+
10561099
for _, pod := range pods {
10571100
ipod := wrapper.Pod(pod)
10581101
// only collect succeeed job outputs.
10591102
if !ipod.Succeeded() {
10601103
return nil
10611104
}
1105+
10621106
for _, containerStatus := range pod.Status.ContainerStatuses {
1063-
if containerStatus.Name != containerName {
1064-
continue
1065-
}
10661107
if containerStatus.State.Terminated != nil && len(containerStatus.State.Terminated.Message) != 0 {
10671108
if err := json.Unmarshal([]byte(containerStatus.State.Terminated.Message), &outputs); err != nil {
10681109
return err
10691110
}
10701111
}
10711112
}
10721113
}
1114+
10731115
writeOutputs(outputs, jobTask.Key, workflowCtx)
10741116
return nil
10751117
}

pkg/microservice/aslan/core/workflow/service/workflow/controller/job/job_plugin.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,17 @@ func (j PluginJobController) GetVariableList(jobName string, getAggregatedVariab
167167
Type: "string",
168168
IsCredential: false,
169169
})
170+
171+
for _, output := range j.jobSpec.Plugin.Outputs {
172+
if getServiceSpecificVariables {
173+
resp = append(resp, &commonmodels.KeyVal{
174+
Key: strings.Join([]string{"job", j.name, "output", output.Name}, "."),
175+
Value: "",
176+
Type: "string",
177+
IsCredential: false,
178+
})
179+
}
180+
}
170181
}
171182
return resp, nil
172183
}

0 commit comments

Comments
 (0)