Skip to content

Commit fa5e3dc

Browse files
committed
fix: render TES task volumes as emptyDir in K8s executor jobs
1 parent b433b6a commit fa5e3dc

6 files changed

Lines changed: 214 additions & 11 deletions

File tree

compute/kubernetes/backend.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -254,9 +254,10 @@ func (b *Backend) createResources(ctx context.Context, task *tes.Task, config *c
254254
}
255255
}
256256

257-
// If the task has inputs, outputs, or declared volumes, create a PVC so
258-
// executor pods can share data via PVC subPath mounts.
259-
if len(task.Inputs) > 0 || len(task.Outputs) > 0 || len(task.Volumes) > 0 {
257+
// If the task has inputs or outputs, create a PVC so executor pods can
258+
// share data via PVC subPath mounts. TES task.Volumes are rendered as
259+
// emptyDir volumes in the executor job template and do not need a PVC.
260+
if len(task.Inputs) > 0 || len(task.Outputs) > 0 {
260261
b.log.Debug("creating Worker PV", "taskID", task.Id)
261262

262263
// Check to make sure required configs are present

compute/kubernetes/resources/job.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func CreateJob(ctx context.Context, task *tes.Task, conf *config.Config, client
8282
"DiskGb": res.GetDiskGb(),
8383
"Image": image,
8484
"BackoffLimit": backoffLimit,
85-
"NeedsPVC": len(task.Inputs) > 0 || len(task.Outputs) > 0 || len(task.Volumes) > 0,
85+
"NeedsPVC": len(task.Inputs) > 0 || len(task.Outputs) > 0,
8686
"NodeSelector": conf.Kubernetes.NodeSelector,
8787
"Tolerations": conf.Kubernetes.Tolerations,
8888
"ServiceAccountName": fmt.Sprintf("funnel-worker-sa-%s-%s", conf.Kubernetes.JobsNamespace, task.Id),

config/kubernetes/executor-job.yaml

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,16 +69,24 @@ spec:
6969

7070
volumeMounts:
7171
{{- if .NeedsPVC }}
72-
{{- range $idx, $item := .Volumes}}
73-
- name: funnel-storage-{{$.TaskId}}
74-
mountPath: {{$item.ContainerPath}}
75-
subPath: {{$.TaskId}}{{$item.ContainerPath}}
76-
{{- end}}
72+
{{- range $idx, $item := .Volumes}}
73+
- name: funnel-storage-{{$.TaskId}}
74+
mountPath: {{$item.ContainerPath}}
75+
subPath: {{$.TaskId}}{{$item.ContainerPath}}
76+
{{- end}}
7777
{{- end }}
78+
{{- range $idx, $vol := .TaskVolumes}}
79+
- name: funnel-vol-{{$idx}}
80+
mountPath: {{$vol}}
81+
{{- end}}
7882

7983
volumes:
8084
{{- if .NeedsPVC }}
8185
- name: funnel-storage-{{.TaskId}}
8286
persistentVolumeClaim:
8387
claimName: funnel-worker-pvc-{{.TaskId}}
8488
{{- end }}
89+
{{- range $idx, $vol := .TaskVolumes}}
90+
- name: funnel-vol-{{$idx}}
91+
emptyDir: {}
92+
{{- end}}

worker/kubernetes.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type KubernetesCommand struct {
3838
ResourceLimits *tes.Resources
3939
ServiceAccount string
4040
NeedsPVC bool
41+
TaskVolumes []string // TES task.Volumes container paths; rendered as emptyDir volumes
4142
Clientset kubernetes.Interface
4243
Command
4344
}
@@ -150,6 +151,7 @@ func (kcmd KubernetesCommand) Run(ctx context.Context) error {
150151
"UseShell": useShell,
151152
"Workdir": kcmd.Workdir,
152153
"Volumes": kcmd.Volumes,
154+
"TaskVolumes": kcmd.TaskVolumes,
153155
"Env": kcmd.Env,
154156
"Cpus": kcmd.Resources.CpuCores,
155157
"RamGb": kcmd.Resources.RamGb,

worker/kubernetes_volumes_test.go

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
package worker
2+
3+
import (
4+
"bytes"
5+
"os"
6+
"testing"
7+
"text/template"
8+
9+
batchv1 "k8s.io/api/batch/v1"
10+
corev1 "k8s.io/api/core/v1"
11+
"k8s.io/client-go/kubernetes/scheme"
12+
)
13+
14+
func renderExecutorJobWithVolumes(t *testing.T, taskVolumes []string, pvcVolumes []Volume, needsPVC bool) *batchv1.Job {
15+
t.Helper()
16+
17+
tmplBytes, err := os.ReadFile("../config/kubernetes/executor-job.yaml")
18+
if err != nil {
19+
t.Fatalf("read executor-job.yaml: %v", err)
20+
}
21+
22+
tpl, err := template.New("executor").Parse(string(tmplBytes))
23+
if err != nil {
24+
t.Fatalf("parse template: %v", err)
25+
}
26+
27+
data := map[string]interface{}{
28+
"TaskId": "task-abc",
29+
"JobId": 0,
30+
"Namespace": "test-ns",
31+
"JobsNamespace": "test-ns",
32+
"Command": []string{"echo ok"},
33+
"UseShell": false,
34+
"Workdir": "/",
35+
"Volumes": pvcVolumes,
36+
"TaskVolumes": taskVolumes,
37+
"Env": map[string]string{},
38+
"Cpus": "100m",
39+
"RamGb": "128Mi",
40+
"DiskGb": "1Gi",
41+
"CpusLimit": "0",
42+
"RamGbLimit": "0",
43+
"DiskGbLimit": "0",
44+
"Image": "alpine",
45+
"NeedsPVC": needsPVC,
46+
"NodeSelector": nil,
47+
"Tolerations": nil,
48+
"ServiceAccountName": "funnel-sa-test-ns",
49+
}
50+
51+
var buf bytes.Buffer
52+
if err := tpl.Execute(&buf, data); err != nil {
53+
t.Fatalf("execute template: %v\nrendered:\n%s", err, buf.String())
54+
}
55+
56+
obj, _, err := scheme.Codecs.UniversalDeserializer().Decode(buf.Bytes(), nil, nil)
57+
if err != nil {
58+
t.Fatalf("decode rendered template: %v\nrendered:\n%s", err, buf.String())
59+
}
60+
61+
job, ok := obj.(*batchv1.Job)
62+
if !ok {
63+
t.Fatalf("decoded object is not Job: %T", obj)
64+
}
65+
return job
66+
}
67+
68+
// TestTaskVolumesRenderedAsEmptyDir verifies that TES task.Volumes are rendered
69+
// as emptyDir volumes (not PVC mounts) in the executor job pod spec.
70+
func TestTaskVolumesRenderedAsEmptyDir(t *testing.T) {
71+
job := renderExecutorJobWithVolumes(t,
72+
[]string{"/vol", "/tmp/shared"},
73+
nil, // no PVC volumes
74+
false, // no PVC needed
75+
)
76+
77+
podSpec := job.Spec.Template.Spec
78+
79+
// Expect two volumes, both emptyDir
80+
if len(podSpec.Volumes) != 2 {
81+
t.Fatalf("expected 2 volumes, got %d: %+v", len(podSpec.Volumes), podSpec.Volumes)
82+
}
83+
for _, v := range podSpec.Volumes {
84+
if v.EmptyDir == nil {
85+
t.Errorf("volume %q should be emptyDir, got: %+v", v.Name, v.VolumeSource)
86+
}
87+
}
88+
89+
// Expect two volumeMounts in the container
90+
if len(podSpec.Containers) == 0 {
91+
t.Fatal("no containers in pod spec")
92+
}
93+
mounts := podSpec.Containers[0].VolumeMounts
94+
if len(mounts) != 2 {
95+
t.Fatalf("expected 2 volumeMounts, got %d: %+v", len(mounts), mounts)
96+
}
97+
98+
mountPaths := map[string]bool{}
99+
for _, m := range mounts {
100+
mountPaths[m.MountPath] = true
101+
}
102+
for _, vol := range []string{"/vol", "/tmp/shared"} {
103+
if !mountPaths[vol] {
104+
t.Errorf("expected mountPath %q not found in mounts: %+v", vol, mounts)
105+
}
106+
}
107+
}
108+
109+
// TestTaskVolumesDoNotUsePVC verifies that when a task only has volumes (no
110+
// inputs/outputs), no PVC volume appears in the pod spec.
111+
func TestTaskVolumesDoNotUsePVC(t *testing.T) {
112+
job := renderExecutorJobWithVolumes(t,
113+
[]string{"/data"},
114+
nil,
115+
false,
116+
)
117+
118+
for _, v := range job.Spec.Template.Spec.Volumes {
119+
if v.PersistentVolumeClaim != nil {
120+
t.Errorf("unexpected PVC volume %q when task has only TES volumes", v.Name)
121+
}
122+
}
123+
}
124+
125+
// TestTaskVolumesAndPVCCoexist verifies that a task with both inputs (PVC) and
126+
// TES volumes renders both a PVC mount and emptyDir mounts without conflict.
127+
func TestTaskVolumesAndPVCCoexist(t *testing.T) {
128+
pvcVols := []Volume{
129+
{HostPath: "/work/inputs/file.txt", ContainerPath: "/inputs/file.txt", Readonly: false},
130+
}
131+
job := renderExecutorJobWithVolumes(t,
132+
[]string{"/vol"},
133+
pvcVols,
134+
true, // has PVC for inputs
135+
)
136+
137+
podSpec := job.Spec.Template.Spec
138+
139+
var hasEmptyDir, hasPVC bool
140+
for _, v := range podSpec.Volumes {
141+
if v.EmptyDir != nil {
142+
hasEmptyDir = true
143+
}
144+
if v.PersistentVolumeClaim != nil {
145+
hasPVC = true
146+
}
147+
}
148+
149+
if !hasEmptyDir {
150+
t.Error("expected at least one emptyDir volume for TES task volume")
151+
}
152+
if !hasPVC {
153+
t.Error("expected PVC volume for inputs")
154+
}
155+
156+
// Verify no mountPath appears twice
157+
mounts := podSpec.Containers[0].VolumeMounts
158+
seen := map[string]int{}
159+
for _, m := range mounts {
160+
seen[m.MountPath]++
161+
}
162+
for path, count := range seen {
163+
if count > 1 {
164+
t.Errorf("mountPath %q appears %d times — duplicate mount", path, count)
165+
}
166+
}
167+
168+
// Verify the emptyDir mount does not use a subPath (subPath is PVC-specific)
169+
for _, m := range mounts {
170+
if m.MountPath == "/vol" && m.SubPath != "" {
171+
t.Errorf("emptyDir mount at /vol should not have subPath, got %q", m.SubPath)
172+
}
173+
}
174+
175+
_ = corev1.EmptyDirVolumeSource{} // ensure corev1 import used
176+
}

worker/worker.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,21 @@ func (r *DefaultWorker) Run(pctx context.Context) (runerr error) {
226226
// return fmt.Errorf("error writing resources event for task %s: %v", task.Id, err)
227227
}
228228

229+
// Build a set of TES task volume paths so they can be excluded
230+
// from PVC mounts — TES volumes are rendered as emptyDir instead.
231+
taskVolSet := make(map[string]bool, len(task.GetVolumes()))
232+
for _, v := range task.GetVolumes() {
233+
taskVolSet[v] = true
234+
}
235+
var pvcVolumes []Volume
236+
for _, v := range mapper.Volumes {
237+
if !taskVolSet[v.ContainerPath] {
238+
pvcVolumes = append(pvcVolumes, v)
239+
}
240+
}
241+
pvcCommand := command
242+
pvcCommand.Volumes = pvcVolumes
243+
229244
taskCommand = &KubernetesCommand{
230245
TaskId: task.Id,
231246
JobId: i,
@@ -237,8 +252,9 @@ func (r *DefaultWorker) Run(pctx context.Context) (runerr error) {
237252
JobsNamespace: r.Executor.JobsNamespace,
238253
Resources: resources,
239254
ResourceLimits: resourceLimits,
240-
Command: command,
241-
NeedsPVC: len(task.GetInputs()) > 0 || len(task.GetOutputs()) > 0 || len(task.GetVolumes()) > 0,
255+
Command: pvcCommand,
256+
NeedsPVC: len(task.GetInputs()) > 0 || len(task.GetOutputs()) > 0,
257+
TaskVolumes: task.GetVolumes(),
242258
NodeSelector: r.Executor.NodeSelector,
243259
Tolerations: r.Executor.Tolerations,
244260
ServiceAccount: fmt.Sprintf("funnel-worker-sa-%s-%s", r.Executor.JobsNamespace, task.Id),

0 commit comments

Comments
 (0)