Skip to content

Commit 85a7924

Browse files
committed
storage/cmd: new commands json-rpc-server and apply-splitfdstream
Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
1 parent 2ce414b commit 85a7924

1 file changed

Lines changed: 215 additions & 0 deletions

File tree

Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
//go:build linux
2+
3+
package main
4+
5+
import (
6+
"bytes"
7+
"fmt"
8+
"os"
9+
"os/signal"
10+
"path/filepath"
11+
"syscall"
12+
13+
"go.podman.io/storage"
14+
graphdriver "go.podman.io/storage/drivers"
15+
"go.podman.io/storage/pkg/archive"
16+
"go.podman.io/storage/pkg/chunked"
17+
"go.podman.io/storage/pkg/mflag"
18+
"go.podman.io/storage/pkg/splitfdstream"
19+
)
20+
21+
const defaultJSONRPCSocket = "json-rpc.sock"
22+
23+
var (
24+
splitfdstreamSocket = ""
25+
applyFdstreamSocket = ""
26+
applyFdstreamParent = ""
27+
applyFdstreamMountLabel = ""
28+
)
29+
30+
// splitFDStreamDiffer implements graphdriver.Differ for splitfdstream data
31+
type splitFDStreamDiffer struct {
32+
streamData []byte
33+
fds []*os.File
34+
store storage.Store
35+
}
36+
37+
func (d *splitFDStreamDiffer) ApplyDiff(dest string, options *archive.TarOptions, differOpts *graphdriver.DifferOptions) (graphdriver.DriverWithDifferOutput, error) {
38+
driver, err := d.store.GraphDriver()
39+
if err != nil {
40+
return graphdriver.DriverWithDifferOutput{}, fmt.Errorf("failed to get graph driver: %w", err)
41+
}
42+
43+
splitDriver, ok := driver.(splitfdstream.SplitFDStreamDriver)
44+
if !ok {
45+
return graphdriver.DriverWithDifferOutput{}, fmt.Errorf("driver %s does not support splitfdstream", driver.String())
46+
}
47+
48+
opts := &splitfdstream.ApplySplitFDStreamOpts{
49+
Stream: bytes.NewReader(d.streamData),
50+
FileDescriptors: d.fds,
51+
StagingDir: dest,
52+
}
53+
54+
size, err := splitDriver.ApplySplitFDStream(opts)
55+
if err != nil {
56+
return graphdriver.DriverWithDifferOutput{}, fmt.Errorf("failed to apply splitfdstream to staging dir %s: %w", dest, err)
57+
}
58+
59+
return graphdriver.DriverWithDifferOutput{
60+
Target: dest,
61+
Size: size,
62+
}, nil
63+
}
64+
65+
func (d *splitFDStreamDiffer) Close() error {
66+
return nil
67+
}
68+
69+
func splitfdstreamServer(flags *mflag.FlagSet, action string, m storage.Store, args []string) (int, error) {
70+
driver, err := m.GraphDriver()
71+
if err != nil {
72+
return 1, fmt.Errorf("failed to get graph driver: %w", err)
73+
}
74+
75+
// Create digest lookup function using the chunked cache
76+
digestLookup := func(digest string) (*os.File, int64, int64, error) {
77+
result, err := chunked.FindFileByDigest(m, digest)
78+
if err != nil {
79+
return nil, 0, 0, err
80+
}
81+
if result == nil {
82+
return nil, 0, 0, nil
83+
}
84+
return result.File, result.Offset, result.Size, nil
85+
}
86+
87+
server := splitfdstream.NewJSONRPCServer(driver, digestLookup)
88+
89+
socketPath := splitfdstreamSocket
90+
if socketPath == "" {
91+
socketPath = filepath.Join(m.RunRoot(), defaultJSONRPCSocket)
92+
}
93+
94+
if err := server.Start(socketPath); err != nil {
95+
return 1, fmt.Errorf("failed to start server: %w", err)
96+
}
97+
defer server.Stop()
98+
99+
fmt.Printf("%s\n", socketPath)
100+
101+
// Wait for interrupt signal
102+
sigCh := make(chan os.Signal, 1)
103+
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
104+
<-sigCh
105+
106+
return 0, nil
107+
}
108+
109+
func applySplitfdstream(flags *mflag.FlagSet, action string, m storage.Store, args []string) (int, error) {
110+
layerID := args[0]
111+
112+
socketPath := applyFdstreamSocket
113+
if socketPath == "" {
114+
socketPath = filepath.Join(m.RunRoot(), defaultJSONRPCSocket)
115+
}
116+
117+
defer func() {
118+
if _, err := m.Shutdown(false); err != nil {
119+
fmt.Fprintf(os.Stderr, "warning: failed to shutdown storage: %v\n", err)
120+
}
121+
}()
122+
123+
client, err := splitfdstream.NewJSONRPCClient(socketPath)
124+
if err != nil {
125+
return 1, fmt.Errorf("failed to connect to server: %w", err)
126+
}
127+
defer client.Close()
128+
129+
// Get splitfdstream data from remote server
130+
streamData, fds, err := client.GetSplitFDStream(layerID, "")
131+
if err != nil {
132+
return 1, fmt.Errorf("failed to get splitfdstream from server: %w", err)
133+
}
134+
135+
// Close received FDs when done
136+
defer func() {
137+
for _, fd := range fds {
138+
fd.Close()
139+
}
140+
}()
141+
142+
// Create a custom differ for splitfdstream data
143+
differ := &splitFDStreamDiffer{
144+
streamData: streamData,
145+
fds: fds,
146+
store: m,
147+
}
148+
defer differ.Close()
149+
150+
// Prepare the staged layer
151+
diffOptions := &graphdriver.ApplyDiffWithDifferOpts{}
152+
diffOutput, err := m.PrepareStagedLayer(diffOptions, differ)
153+
if err != nil {
154+
return 1, fmt.Errorf("failed to prepare staged layer: %w", err)
155+
}
156+
157+
// Apply the staged layer to create the final layer
158+
applyArgs := storage.ApplyStagedLayerOptions{
159+
ID: layerID,
160+
ParentLayer: applyFdstreamParent,
161+
MountLabel: applyFdstreamMountLabel,
162+
Writeable: false,
163+
LayerOptions: &storage.LayerOptions{},
164+
DiffOutput: diffOutput,
165+
DiffOptions: diffOptions,
166+
}
167+
168+
layer, err := m.ApplyStagedLayer(applyArgs)
169+
if err != nil {
170+
// Clean up the staged layer on failure
171+
if cleanupErr := m.CleanupStagedLayer(diffOutput); cleanupErr != nil {
172+
fmt.Fprintf(os.Stderr, "warning: failed to cleanup staged layer: %v\n", cleanupErr)
173+
}
174+
return 1, fmt.Errorf("failed to apply staged layer: %w", err)
175+
}
176+
177+
// Output the result
178+
if jsonOutput {
179+
return outputJSON(map[string]interface{}{"id": layer.ID, "size": diffOutput.Size})
180+
}
181+
fmt.Printf("%s\n", layer.ID)
182+
return 0, nil
183+
}
184+
185+
func init() {
186+
commands = append(commands, command{
187+
names: []string{"json-rpc-server"},
188+
optionsHelp: "[options]",
189+
usage: "Start a JSON-RPC server",
190+
minArgs: 0,
191+
maxArgs: 0,
192+
action: splitfdstreamServer,
193+
addFlags: func(flags *mflag.FlagSet, cmd *command) {
194+
flags.StringVar(&splitfdstreamSocket, []string{"-socket"}, "",
195+
"Path to UNIX socket")
196+
},
197+
})
198+
commands = append(commands, command{
199+
names: []string{"apply-splitfdstream"},
200+
optionsHelp: "[options] layerID",
201+
usage: "Fetch a layer from remote server and apply it locally",
202+
minArgs: 1,
203+
maxArgs: 1,
204+
action: applySplitfdstream,
205+
addFlags: func(flags *mflag.FlagSet, cmd *command) {
206+
flags.StringVar(&applyFdstreamSocket, []string{"-socket"}, "",
207+
"Path to remote UNIX socket")
208+
flags.StringVar(&applyFdstreamParent, []string{"-parent"}, "",
209+
"Parent layer ID for the new layer")
210+
flags.StringVar(&applyFdstreamMountLabel, []string{"-mount-label"}, "",
211+
"SELinux mount label for the layer")
212+
flags.BoolVar(&jsonOutput, []string{"-json", "j"}, jsonOutput, "Prefer JSON output")
213+
},
214+
})
215+
}

0 commit comments

Comments
 (0)