Skip to content

Commit b364dcc

Browse files
committed
storage/archive,chrootarchive: add support for splitfdstream
Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
1 parent e332f87 commit b364dcc

4 files changed

Lines changed: 310 additions & 15 deletions

File tree

storage/pkg/archive/archive.go

Lines changed: 65 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -704,7 +704,7 @@ func (ta *tarWriter) addFile(headers *addFileData) error {
704704
return nil
705705
}
706706

707-
func extractTarFileEntry(path, extractDir string, hdr *tar.Header, reader io.Reader, Lchown bool, chownOpts *idtools.IDPair, inUserns, ignoreChownErrors bool, forceMask *os.FileMode, buffer []byte) error {
707+
func extractTarFileEntry(path, extractDir string, hdr *tar.Header, writeContent func(*os.File) error, Lchown bool, chownOpts *idtools.IDPair, inUserns, ignoreChownErrors bool, forceMask *os.FileMode) error {
708708
// hdr.Mode is in linux format, which we can use for sycalls,
709709
// but for os.Foo() calls we need the mode converted to os.FileMode,
710710
// so use hdrInfo.Mode() (they differ for e.g. setuid bits)
@@ -741,9 +741,11 @@ func extractTarFileEntry(path, extractDir string, hdr *tar.Header, reader io.Rea
741741
if err != nil {
742742
return err
743743
}
744-
if _, err := io.CopyBuffer(file, reader, buffer); err != nil {
745-
file.Close()
746-
return err
744+
if writeContent != nil {
745+
if err := writeContent(file); err != nil {
746+
file.Close()
747+
return err
748+
}
747749
}
748750
if err := file.Close(); err != nil {
749751
return err
@@ -1087,17 +1089,67 @@ func TarWithOptions(srcPath string, options *TarOptions) (io.ReadCloser, error)
10871089
return pipeReader, nil
10881090
}
10891091

1090-
// Unpack unpacks the decompressedArchive to dest with options.
1091-
func Unpack(decompressedArchive io.Reader, dest string, options *TarOptions) error {
1092+
// TarEntryIterator abstracts iteration over tar entries.
1093+
// Standard implementation wraps tar.Reader; splitfdstream provides
1094+
// entries from its chunk-based format with reflink support.
1095+
type TarEntryIterator interface {
1096+
// Next advances to the next entry and returns its header.
1097+
Next() (*tar.Header, error)
1098+
// WriteContentTo writes the current entry's file content to dst.
1099+
// Only called for TypeReg entries with Size > 0.
1100+
WriteContentTo(dst *os.File) error
1101+
}
1102+
1103+
// tarReaderIterator implements TarEntryIterator for a standard tar.Reader.
1104+
type tarReaderIterator struct {
1105+
tr *tar.Reader
1106+
trBuf *bufio.Reader
1107+
buffer []byte
1108+
}
1109+
1110+
func newTarReaderIterator(decompressedArchive io.Reader) *tarReaderIterator {
10921111
tr := tar.NewReader(decompressedArchive)
10931112
trBuf := pools.BufioReader32KPool.Get(nil)
1094-
defer pools.BufioReader32KPool.Put(trBuf)
1113+
return &tarReaderIterator{
1114+
tr: tr,
1115+
trBuf: trBuf,
1116+
buffer: make([]byte, 1<<20),
1117+
}
1118+
}
10951119

1120+
func (i *tarReaderIterator) Next() (*tar.Header, error) {
1121+
hdr, err := i.tr.Next()
1122+
if err != nil {
1123+
return nil, err
1124+
}
1125+
i.trBuf.Reset(i.tr)
1126+
return hdr, nil
1127+
}
1128+
1129+
func (i *tarReaderIterator) WriteContentTo(dst *os.File) error {
1130+
_, err := io.CopyBuffer(dst, i.trBuf, i.buffer)
1131+
return err
1132+
}
1133+
1134+
func (i *tarReaderIterator) close() {
1135+
pools.BufioReader32KPool.Put(i.trBuf)
1136+
}
1137+
1138+
// Unpack unpacks the decompressedArchive to dest with options.
1139+
func Unpack(decompressedArchive io.Reader, dest string, options *TarOptions) error {
1140+
iter := newTarReaderIterator(decompressedArchive)
1141+
defer iter.close()
1142+
return UnpackFromIterator(iter, dest, options)
1143+
}
1144+
1145+
// UnpackFromIterator unpacks tar entries from the given iterator to dest with options.
1146+
// This allows plugging in alternative sources of tar entries (e.g., splitfdstream)
1147+
// while reusing the full extraction logic including xattrs, whiteouts, device nodes, etc.
1148+
func UnpackFromIterator(iter TarEntryIterator, dest string, options *TarOptions) error {
10961149
var dirs []*tar.Header
10971150
idMappings := idtools.NewIDMappingsFromMaps(options.UIDMaps, options.GIDMaps)
10981151
rootIDs := idMappings.RootPair()
10991152
whiteoutConverter := GetWhiteoutConverter(options.WhiteoutFormat, options.WhiteoutData)
1100-
buffer := make([]byte, 1<<20)
11011153

11021154
doChown := !options.NoLchown
11031155
if options.ForceMask != nil {
@@ -1109,7 +1161,7 @@ func Unpack(decompressedArchive io.Reader, dest string, options *TarOptions) err
11091161
// Iterate through the files in the archive.
11101162
loop:
11111163
for {
1112-
hdr, err := tr.Next()
1164+
hdr, err := iter.Next()
11131165
if err == io.EOF {
11141166
// end of tar archive
11151167
break
@@ -1183,7 +1235,6 @@ loop:
11831235
}
11841236
}
11851237
}
1186-
trBuf.Reset(tr)
11871238

11881239
chownOpts := options.ChownOpts
11891240
if err := remapIDs(nil, idMappings, chownOpts, hdr); err != nil {
@@ -1204,7 +1255,10 @@ loop:
12041255
chownOpts = &idtools.IDPair{UID: hdr.Uid, GID: hdr.Gid}
12051256
}
12061257

1207-
if err = extractTarFileEntry(path, dest, hdr, trBuf, doChown, chownOpts, options.InUserNS, options.IgnoreChownErrors, options.ForceMask, buffer); err != nil {
1258+
writeContent := func(dst *os.File) error {
1259+
return iter.WriteContentTo(dst)
1260+
}
1261+
if err = extractTarFileEntry(path, dest, hdr, writeContent, doChown, chownOpts, options.InUserNS, options.IgnoreChownErrors, options.ForceMask); err != nil {
12081262
return err
12091263
}
12101264

storage/pkg/archive/archive_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -747,8 +747,7 @@ func TestTarWithOptions(t *testing.T) {
747747
func TestTypeXGlobalHeaderDoesNotFail(t *testing.T) {
748748
hdr := tar.Header{Typeflag: tar.TypeXGlobalHeader}
749749
tmpDir := t.TempDir()
750-
buffer := make([]byte, 1<<20)
751-
err := extractTarFileEntry(filepath.Join(tmpDir, "pax_global_header"), tmpDir, &hdr, nil, true, nil, false, false, nil, buffer)
750+
err := extractTarFileEntry(filepath.Join(tmpDir, "pax_global_header"), tmpDir, &hdr, nil, true, nil, false, false, nil)
752751
if err != nil {
753752
t.Fatal(err)
754753
}

storage/pkg/archive/diff.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ func UnpackLayer(dest string, layer io.Reader, options *TarOptions) (size int64,
104104
}
105105
defer os.RemoveAll(aufsTempdir)
106106
}
107-
if err := extractTarFileEntry(filepath.Join(aufsTempdir, basename), dest, hdr, tr, true, nil, options.InUserNS, options.IgnoreChownErrors, options.ForceMask, buffer); err != nil {
107+
writeContent := func(dst *os.File) error { _, err := io.CopyBuffer(dst, tr, buffer); return err }
108+
if err := extractTarFileEntry(filepath.Join(aufsTempdir, basename), dest, hdr, writeContent, true, nil, options.InUserNS, options.IgnoreChownErrors, options.ForceMask); err != nil {
108109
return 0, err
109110
}
110111
}
@@ -209,7 +210,8 @@ func UnpackLayer(dest string, layer io.Reader, options *TarOptions) (size int64,
209210
return 0, err
210211
}
211212

212-
if err := extractTarFileEntry(path, dest, srcHdr, srcData, true, nil, options.InUserNS, options.IgnoreChownErrors, options.ForceMask, buffer); err != nil {
213+
writeContent := func(dst *os.File) error { _, err := io.CopyBuffer(dst, srcData, buffer); return err }
214+
if err := extractTarFileEntry(path, dest, srcHdr, writeContent, true, nil, options.InUserNS, options.IgnoreChownErrors, options.ForceMask); err != nil {
213215
return 0, err
214216
}
215217

Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
package chrootarchive
2+
3+
import (
4+
"bytes"
5+
"flag"
6+
"fmt"
7+
"io"
8+
"net"
9+
"os"
10+
"path/filepath"
11+
"runtime"
12+
13+
"go.podman.io/storage/pkg/archive"
14+
"go.podman.io/storage/pkg/fileutils"
15+
"go.podman.io/storage/pkg/idtools"
16+
"go.podman.io/storage/pkg/reexec"
17+
"go.podman.io/storage/pkg/splitfdstream"
18+
"go.podman.io/storage/pkg/system"
19+
"go.podman.io/storage/pkg/unshare"
20+
"golang.org/x/sys/unix"
21+
)
22+
23+
// splitFDStreamSocketDescriptor is the fd for the Unix socket used to
24+
// receive file descriptors via SCM_RIGHTS in the re-exec child.
25+
const splitFDStreamSocketDescriptor = 5
26+
27+
func init() {
28+
reexec.Register("storage-untar-splitfdstream", untarSplitFDStream)
29+
}
30+
31+
// UntarSplitFDStream extracts a splitfdstream into dest inside a chroot.
32+
// The stream provides tar headers as inline data, with file content
33+
// delivered via the fds slice (for reflink-based copying).
34+
// FDs are streamed to the child process one-at-a-time over a Unix socket
35+
// using SCM_RIGHTS, avoiding EMFILE from inheriting too many FDs at exec.
36+
func UntarSplitFDStream(stream io.Reader, fds []*os.File, dest string, options *archive.TarOptions) error {
37+
if stream == nil {
38+
return fmt.Errorf("empty stream")
39+
}
40+
if options == nil {
41+
options = &archive.TarOptions{}
42+
options.InUserNS = unshare.IsRootless()
43+
}
44+
45+
idMappings := idtools.NewIDMappingsFromMaps(options.UIDMaps, options.GIDMaps)
46+
rootIDs := idMappings.RootPair()
47+
48+
dest = filepath.Clean(dest)
49+
if err := fileutils.Exists(dest); os.IsNotExist(err) {
50+
if err := idtools.MkdirAllAndChownNew(dest, 0o755, rootIDs); err != nil {
51+
return err
52+
}
53+
}
54+
55+
destVal, err := newUnpackDestination(dest, dest)
56+
if err != nil {
57+
return err
58+
}
59+
defer destVal.Close()
60+
61+
return invokeUnpackSplitFDStream(stream, fds, destVal, options)
62+
}
63+
64+
// untarSplitFDStream is the re-exec entry point for "storage-untar-splitfdstream".
65+
// It runs inside a chroot and receives FDs lazily via SCM_RIGHTS from a Unix
66+
// socket, then calls archive.UnpackFromIterator for full extraction logic.
67+
func untarSplitFDStream() {
68+
runtime.LockOSThread()
69+
flag.Parse()
70+
71+
var options archive.TarOptions
72+
73+
// Read TarOptions from fd 3 (same as regular untar)
74+
if err := json.NewDecoder(os.NewFile(tarOptionsDescriptor, "options")).Decode(&options); err != nil {
75+
fatal(err)
76+
}
77+
78+
dst := flag.Arg(0)
79+
var root string
80+
if len(flag.Args()) > 1 {
81+
root = flag.Arg(1)
82+
}
83+
84+
// Handle the root fd (same pattern as regular untar)
85+
if root == procPathForFd(rootFileDescriptor) {
86+
rootFd := os.NewFile(rootFileDescriptor, "tar-root")
87+
defer rootFd.Close()
88+
if err := unix.Fchdir(int(rootFd.Fd())); err != nil {
89+
fatal(err)
90+
}
91+
root = "."
92+
} else if root == "" {
93+
root = dst
94+
}
95+
96+
if err := chroot(root); err != nil {
97+
fatal(err)
98+
}
99+
100+
// We need to be able to set any perms
101+
oldMask, err := system.Umask(0)
102+
if err != nil {
103+
fatal(err)
104+
}
105+
defer func() {
106+
_, _ = system.Umask(oldMask)
107+
}()
108+
109+
if unshare.IsRootless() {
110+
options.InUserNS = true
111+
}
112+
113+
// Set up FD receiver from the Unix socket (fd 5)
114+
sockFile := os.NewFile(splitFDStreamSocketDescriptor, "fd-socket")
115+
sockConn, err := net.FileConn(sockFile)
116+
sockFile.Close() // FileConn dups the fd
117+
if err != nil {
118+
fatal(fmt.Errorf("failed to create net.Conn from fd socket: %w", err))
119+
}
120+
unixConn, ok := sockConn.(*net.UnixConn)
121+
if !ok {
122+
sockConn.Close()
123+
fatal(fmt.Errorf("fd socket is not a Unix connection"))
124+
}
125+
defer unixConn.Close()
126+
127+
fdPasser := splitfdstream.NewFDPasser(unixConn)
128+
129+
// Create an iterator that receives FDs lazily via SCM_RIGHTS
130+
recv := func() (*os.File, error) {
131+
_, fds, err := fdPasser.ReceiveFileDescriptors(1)
132+
if err != nil {
133+
return nil, fmt.Errorf("failed to receive FD via SCM_RIGHTS: %w", err)
134+
}
135+
if len(fds) != 1 {
136+
// Close any unexpected FDs
137+
for _, f := range fds {
138+
f.Close()
139+
}
140+
return nil, fmt.Errorf("expected 1 FD, got %d", len(fds))
141+
}
142+
return fds[0], nil
143+
}
144+
145+
iter := splitfdstream.NewIteratorWithFDReceiver(os.Stdin, recv)
146+
if err := archive.UnpackFromIterator(iter, dst, &options); err != nil {
147+
fatal(err)
148+
}
149+
150+
// Fully consume stdin in case it is zero padded
151+
if _, err := flush(os.Stdin); err != nil {
152+
fatal(err)
153+
}
154+
155+
os.Exit(0)
156+
}
157+
158+
// invokeUnpackSplitFDStream forks a re-exec child process that chroots into
159+
// dest and unpacks the splitfdstream using archive.UnpackFromIterator.
160+
// FDs are sent to the child one-at-a-time over a Unix socket using SCM_RIGHTS.
161+
func invokeUnpackSplitFDStream(stream io.Reader, fds []*os.File, dest *unpackDestination, options *archive.TarOptions) error {
162+
// Create pipe for TarOptions (fd 3)
163+
optR, optW, err := os.Pipe()
164+
if err != nil {
165+
return fmt.Errorf("splitfdstream options pipe: %w", err)
166+
}
167+
168+
// Create Unix socketpair for passing FDs via SCM_RIGHTS
169+
sockFDs, err := unix.Socketpair(unix.AF_UNIX, unix.SOCK_STREAM, 0)
170+
if err != nil {
171+
optR.Close()
172+
optW.Close()
173+
return fmt.Errorf("splitfdstream socketpair: %w", err)
174+
}
175+
parentSockFile := os.NewFile(uintptr(sockFDs[0]), "fd-socket-parent")
176+
childSockFile := os.NewFile(uintptr(sockFDs[1]), "fd-socket-child")
177+
178+
cmd := reexec.Command("storage-untar-splitfdstream", dest.dest, procPathForFd(rootFileDescriptor))
179+
cmd.Stdin = stream
180+
181+
// ExtraFiles: [optionsPipe(fd3), rootFD(fd4), socketEnd(fd5)]
182+
cmd.ExtraFiles = append(cmd.ExtraFiles, optR) // fd 3
183+
cmd.ExtraFiles = append(cmd.ExtraFiles, dest.root) // fd 4
184+
cmd.ExtraFiles = append(cmd.ExtraFiles, childSockFile) // fd 5
185+
186+
output := bytes.NewBuffer(nil)
187+
cmd.Stdout = output
188+
cmd.Stderr = output
189+
190+
if err := cmd.Start(); err != nil {
191+
optW.Close()
192+
optR.Close()
193+
parentSockFile.Close()
194+
childSockFile.Close()
195+
return fmt.Errorf("splitfdstream untar error on re-exec cmd: %w", err)
196+
}
197+
198+
// Close the child's end in the parent
199+
childSockFile.Close()
200+
201+
// Write TarOptions JSON to the pipe
202+
if err := json.NewEncoder(optW).Encode(options); err != nil {
203+
optW.Close()
204+
parentSockFile.Close()
205+
return fmt.Errorf("splitfdstream options json encode failed: %w", err)
206+
}
207+
optW.Close()
208+
209+
// Send FDs one-at-a-time over the socket using SCM_RIGHTS.
210+
// The child receives them lazily as it processes external chunks.
211+
parentConn, err := net.FileConn(parentSockFile)
212+
parentSockFile.Close() // FileConn dups the fd
213+
if err != nil {
214+
return fmt.Errorf("splitfdstream parent socket: %w", err)
215+
}
216+
parentUnix, ok := parentConn.(*net.UnixConn)
217+
if !ok {
218+
parentConn.Close()
219+
return fmt.Errorf("splitfdstream parent socket is not Unix")
220+
}
221+
222+
fdPasser := splitfdstream.NewFDPasser(parentUnix)
223+
for _, f := range fds {
224+
// Send one FD with a 1-byte dummy message (required by sendmsg)
225+
if err := fdPasser.SendFileDescriptors([]*os.File{f}, []byte{0}); err != nil {
226+
parentUnix.Close()
227+
return fmt.Errorf("splitfdstream send FD: %w", err)
228+
}
229+
}
230+
parentUnix.Close() // signal EOF to child
231+
232+
if err := cmd.Wait(); err != nil {
233+
// Exhaust input to avoid blocking the producer
234+
if _, discardErr := io.Copy(io.Discard, stream); discardErr != nil {
235+
return fmt.Errorf("splitfdstream unpacking failed (error: %w; output: %s)\nexhausting input failed (error: %w)", err, output, discardErr)
236+
}
237+
return fmt.Errorf("splitfdstream unpacking failed (error: %w; output: %s)", err, output)
238+
}
239+
return nil
240+
}

0 commit comments

Comments
 (0)