Skip to content

Commit f3a2f15

Browse files
authored
Merge pull request #1408 from calypr/chore/fix_file_mapper
Fix input/output duplication in `file_mapper`
2 parents 9a88891 + e4244e2 commit f3a2f15

5 files changed

Lines changed: 184 additions & 4 deletions

File tree

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ require (
113113
github.com/shirou/gopsutil/v4 v4.26.3 // indirect
114114
github.com/shopspring/decimal v1.4.0 // indirect
115115
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.43.0 // indirect
116+
mvdan.cc/sh/v3 v3.13.1 // indirect
116117
)
117118

118119
require (

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -954,6 +954,8 @@ k8s.io/kube-openapi v0.0.0-20260127142750-a19766b6e2d4 h1:HhDfevmPS+OalTjQRKbTHp
954954
k8s.io/kube-openapi v0.0.0-20260127142750-a19766b6e2d4/go.mod h1:kdmbQkyfwUagLfXIad1y2TdrjPFWp2Q89B3qkRwf/pQ=
955955
k8s.io/utils v0.0.0-20260210185600-b8788abfbbc2 h1:AZYQSJemyQB5eRxqcPky+/7EdBj0xi3g0ZcxxJ7vbWU=
956956
k8s.io/utils v0.0.0-20260210185600-b8788abfbbc2/go.mod h1:xDxuJ0whA3d0I4mf/C4ppKHxXynQ+fxnkmQH0vTHnuk=
957+
mvdan.cc/sh/v3 v3.13.1 h1:DP3TfgZhDkT7lerUdnp6PTGKyxxzz6T+cOlY/xEvfWk=
958+
mvdan.cc/sh/v3 v3.13.1/go.mod h1:lXJ8SexMvEVcHCoDvAGLZgFJ9Wsm2sulmoNEXGhYZD0=
957959
pgregory.net/rapid v1.2.0 h1:keKAYRcjm+e1F0oAuU5F5+YPAWcyxNNRK2wud503Gnk=
958960
pgregory.net/rapid v1.2.0/go.mod h1:PY5XlDGj0+V1FCq0o192FdRhpKHGTRIWBgqjDBTrq04=
959961
sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 h1:IpInykpT6ceI+QxKBbEflcR5EXP7sU1kvOlxwZh5txg=

worker/file_mapper.go

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,27 @@ func (mapper *FileMapper) ContainerPath(src string) string {
479479
return p
480480
}
481481

482+
// mergeVolume inserts vol into vols using the same RW-deduplication rules as
483+
// AddVolume: skip if already covered by an existing mount, replace a narrower
484+
// existing mount with the wider one, otherwise append.
485+
func mergeVolume(vols []Volume, vol Volume, mapper *FileMapper) []Volume {
486+
for i, v := range vols {
487+
if vol == v {
488+
return vols
489+
}
490+
if !vol.Readonly && !v.Readonly {
491+
if mapper.IsSubpath(vol.ContainerPath, v.ContainerPath) {
492+
return vols
493+
}
494+
if mapper.IsSubpath(v.ContainerPath, vol.ContainerPath) {
495+
vols[i] = vol
496+
return vols
497+
}
498+
}
499+
}
500+
return append(vols, vol)
501+
}
502+
482503
// consolidateVolumes optimizes container mounts by replacing individual input
483504
// file mounts with a single read-write ancestor directory mount.
484505
//
@@ -528,11 +549,12 @@ func (mapper *FileMapper) consolidateVolumes() {
528549
// All inputs share a common ancestor — one mount covers them all.
529550
// WorkDir+ancestor is valid by the HostPath() construction invariant
530551
// (every input host path == WorkDir + container path).
531-
nonInputVols = append(nonInputVols, Volume{
552+
inputAncestorVol := Volume{
532553
HostPath: filepath.Join(mapper.WorkDir, ancestor),
533554
ContainerPath: ancestor,
534555
Readonly: false,
535-
})
556+
}
557+
nonInputVols = mergeVolume(nonInputVols, inputAncestorVol, mapper)
536558
} else {
537559
// Inputs span disjoint subtrees; fall back to promoting each to its
538560
// immediate parent directory to at least eliminate file-level mounts.
@@ -545,11 +567,11 @@ func (mapper *FileMapper) consolidateVolumes() {
545567
continue
546568
}
547569
seen[key] = true
548-
nonInputVols = append(nonInputVols, Volume{
570+
nonInputVols = mergeVolume(nonInputVols, Volume{
549571
HostPath: hostParent,
550572
ContainerPath: contParent,
551573
Readonly: false,
552-
})
574+
}, mapper)
553575
}
554576
}
555577
mapper.Volumes = nonInputVols

worker/file_mapper_test.go

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,3 +187,139 @@ func TestMapTask(t *testing.T) {
187187
t.Fatal("path unmapping failed")
188188
}
189189
}
190+
191+
// TestMapTaskInputOutputSameDirNoDuplicateVolume reproduces the bug where a
192+
// task with both an input and an output that resolve to the same container
193+
// directory produced a duplicate volume mount (invalid in Kubernetes).
194+
func TestMapTaskInputOutputSameDirNoDuplicateVolume(t *testing.T) {
195+
tmp, err := os.MkdirTemp("", "funnel-test-mapper-dedup")
196+
if err != nil {
197+
t.Fatal(err)
198+
}
199+
defer os.RemoveAll(tmp)
200+
201+
f := FileMapper{WorkDir: tmp}
202+
203+
task := &tes.Task{
204+
Inputs: []*tes.Input{
205+
{
206+
Name: "in1",
207+
Path: "/data/input.txt",
208+
Content: "hello",
209+
},
210+
},
211+
Outputs: []*tes.Output{
212+
{
213+
Name: "out1",
214+
Url: "file:///out/output.txt",
215+
Path: "/data/output.txt",
216+
Type: tes.FileType_FILE,
217+
},
218+
},
219+
}
220+
221+
if err := f.MapTask(task); err != nil {
222+
t.Fatal(err)
223+
}
224+
225+
// Verify no two volumes share the same ContainerPath (the K8s duplicate mount error).
226+
seen := map[string]int{}
227+
for _, v := range f.Volumes {
228+
seen[v.ContainerPath]++
229+
}
230+
for path, count := range seen {
231+
if count > 1 {
232+
t.Errorf("duplicate ContainerPath %q appears %d times in Volumes (causes K8s invalid mountPath)", path, count)
233+
}
234+
}
235+
236+
// /data should appear exactly once and cover both the input and the output.
237+
if seen["/data"] != 1 {
238+
t.Errorf("expected /data to appear exactly once in Volumes, got %d; volumes: %+v", seen["/data"], f.Volumes)
239+
}
240+
}
241+
242+
// TestMapTaskInputOutputOverlapNoDuplicateVolume tests the variant where the
243+
// output directory is a parent of the input directory.
244+
func TestMapTaskInputOutputOverlapNoDuplicateVolume(t *testing.T) {
245+
tmp, err := os.MkdirTemp("", "funnel-test-mapper-overlap")
246+
if err != nil {
247+
t.Fatal(err)
248+
}
249+
defer os.RemoveAll(tmp)
250+
251+
f := FileMapper{WorkDir: tmp}
252+
253+
// Input under /data/inputs/, output dir is /data (parent of input ancestor).
254+
task := &tes.Task{
255+
Inputs: []*tes.Input{
256+
{
257+
Name: "in1",
258+
Path: "/data/inputs/file.txt",
259+
Content: "hello",
260+
},
261+
},
262+
Outputs: []*tes.Output{
263+
{
264+
Name: "out1",
265+
Url: "file:///out/result",
266+
Path: "/data/result",
267+
Type: tes.FileType_DIRECTORY,
268+
},
269+
},
270+
}
271+
272+
if err := f.MapTask(task); err != nil {
273+
t.Fatal(err)
274+
}
275+
276+
seen := map[string]int{}
277+
for _, v := range f.Volumes {
278+
seen[v.ContainerPath]++
279+
}
280+
for path, count := range seen {
281+
if count > 1 {
282+
t.Errorf("duplicate ContainerPath %q appears %d times in Volumes", path, count)
283+
}
284+
}
285+
}
286+
287+
// TestMapTaskMultipleInputsAndOutputs mirrors the concrete failure case from
288+
// the bug report: one input and one output with a /tmp volume also present.
289+
func TestMapTaskMultipleInputsAndOutputs(t *testing.T) {
290+
tmp, err := os.MkdirTemp("", "funnel-test-mapper-multi")
291+
if err != nil {
292+
t.Fatal(err)
293+
}
294+
defer os.RemoveAll(tmp)
295+
296+
f := FileMapper{WorkDir: tmp}
297+
298+
task := &tes.Task{
299+
Inputs: []*tes.Input{
300+
{Name: "a", Path: "/data/a.txt", Content: "a"},
301+
{Name: "b", Path: "/data/b.txt", Content: "b"},
302+
},
303+
Outputs: []*tes.Output{
304+
{Name: "out", Url: "file:///out/c.txt", Path: "/data/c.txt", Type: tes.FileType_FILE},
305+
},
306+
}
307+
308+
if err := f.MapTask(task); err != nil {
309+
t.Fatal(err)
310+
}
311+
312+
seen := map[string]int{}
313+
for _, v := range f.Volumes {
314+
seen[v.ContainerPath]++
315+
}
316+
for path, count := range seen {
317+
if count > 1 {
318+
t.Errorf("duplicate ContainerPath %q appears %d times in Volumes", path, count)
319+
}
320+
}
321+
322+
if seen["/data"] != 1 {
323+
t.Errorf("expected /data once in Volumes, got %d; volumes: %+v", seen["/data"], f.Volumes)
324+
}
325+
}

worker/kubernetes.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"k8s.io/client-go/kubernetes/scheme"
2020
batchv1 "k8s.io/client-go/kubernetes/typed/batch/v1"
2121
"k8s.io/client-go/rest"
22+
"mvdan.cc/sh/v3/syntax"
2223
)
2324

2425
// KubernetesCommand is responsible for configuring and running a task in a Kubernetes cluster.
@@ -112,6 +113,14 @@ func (kcmd KubernetesCommand) Run(ctx context.Context) error {
112113
cmd = []string{shellCmd}
113114
}
114115

116+
// Validate single-element shell scripts before passing them to /bin/sh -c.
117+
// Multi-element commands are exec'd directly and bypass the shell entirely.
118+
if len(cmd) == 1 && !hasRedirects {
119+
if err := validateShellSyntax(cmd[0]); err != nil {
120+
return err
121+
}
122+
}
123+
115124
// Use a shell wrapper when the command is a single element (a shell script
116125
// string) or when stdio redirects are present.
117126
useShell := len(cmd) == 1 || hasRedirects
@@ -444,3 +453,13 @@ func getKubernetesClientset() (*kubernetes.Clientset, error) {
444453
clientset, err := kubernetes.NewForConfig(kubeconfig)
445454
return clientset, err
446455
}
456+
457+
// validateShellSyntax returns an error if s is not valid POSIX shell syntax.
458+
// Used to catch issues like unterminated quotes before passing single-element
459+
// commands to /bin/sh -c. Multi-element commands bypass the shell entirely.
460+
func validateShellSyntax(s string) error {
461+
if _, err := syntax.NewParser().Parse(strings.NewReader(s), ""); err != nil {
462+
return fmt.Errorf("single-element command is not valid shell syntax (consider using a multi-element command array instead): %w", err)
463+
}
464+
return nil
465+
}

0 commit comments

Comments
 (0)