Skip to content

Commit dffa4ae

Browse files
authored
Generalize build copy for local testing (#1540)
1 parent ff6da23 commit dffa4ae

1 file changed

Lines changed: 215 additions & 51 deletions

File tree

  • packages/orchestrator/cmd/copy-build

packages/orchestrator/cmd/copy-build/main.go

Lines changed: 215 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,14 @@ import (
55
"errors"
66
"flag"
77
"fmt"
8+
"hash/crc32"
9+
"io"
810
"log"
911
"os"
1012
"os/exec"
13+
"path"
1114
"sort"
15+
"strings"
1216
"sync/atomic"
1317

1418
googleStorage "cloud.google.com/go/storage"
@@ -19,8 +23,64 @@ import (
1923
"github.com/e2b-dev/infra/packages/shared/pkg/storage/header"
2024
)
2125

22-
func getReferencedData(ctx context.Context, bucket *storage.GCPBucketStorageProvider, headerPath string, objectType storage.ObjectType) ([]string, error) {
23-
obj, err := bucket.OpenObject(ctx, headerPath, objectType)
26+
type Destination struct {
27+
Path string
28+
CRC uint32
29+
isLocal bool
30+
}
31+
32+
func NewDestinationFromObject(ctx context.Context, o *googleStorage.ObjectHandle) (*Destination, error) {
33+
var crc uint32
34+
if attrs, err := o.Attrs(ctx); err == nil {
35+
crc = attrs.CRC32C
36+
} else if !errors.Is(err, googleStorage.ErrObjectNotExist) {
37+
return nil, fmt.Errorf("failed to get object attributes: %w", err)
38+
}
39+
40+
return &Destination{
41+
Path: fmt.Sprintf("gs://%s/%s", o.BucketName(), o.ObjectName()),
42+
CRC: crc,
43+
isLocal: false,
44+
}, nil
45+
}
46+
47+
func NewDestinationFromPath(prefix, file string) (*Destination, error) {
48+
p := path.Join(prefix, file)
49+
50+
if _, err := os.Stat(p); err == nil {
51+
f, err := os.Open(p)
52+
if err != nil {
53+
return nil, fmt.Errorf("failed to open file: %w", err)
54+
}
55+
defer f.Close()
56+
57+
h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
58+
_, err = io.Copy(h, f)
59+
if err != nil {
60+
return nil, fmt.Errorf("failed to copy file: %w", err)
61+
}
62+
crc := h.Sum32()
63+
64+
return &Destination{
65+
Path: p,
66+
CRC: crc,
67+
isLocal: true,
68+
}, nil
69+
}
70+
71+
return &Destination{
72+
Path: p,
73+
isLocal: true,
74+
}, nil
75+
}
76+
77+
func NewHeaderFromObject(ctx context.Context, bucketName string, headerPath string, objectType storage.ObjectType) (*header.Header, error) {
78+
b, err := storage.NewGCPBucketStorageProvider(ctx, bucketName, nil)
79+
if err != nil {
80+
return nil, fmt.Errorf("failed to create GCS bucket storage provider: %w", err)
81+
}
82+
83+
obj, err := b.OpenObject(ctx, headerPath, objectType)
2484
if err != nil {
2585
return nil, fmt.Errorf("failed to open object: %w", err)
2686
}
@@ -30,6 +90,33 @@ func getReferencedData(ctx context.Context, bucket *storage.GCPBucketStorageProv
3090
return nil, fmt.Errorf("failed to deserialize header: %w", err)
3191
}
3292

93+
return h, nil
94+
}
95+
96+
type osFileWriterToCtx struct {
97+
f *os.File
98+
}
99+
100+
func (o *osFileWriterToCtx) WriteTo(_ context.Context, w io.Writer) (int64, error) {
101+
return io.Copy(w, o.f)
102+
}
103+
104+
func NewHeaderFromPath(ctx context.Context, from, headerPath string) (*header.Header, error) {
105+
f, err := os.Open(path.Join(from, headerPath))
106+
if err != nil {
107+
return nil, fmt.Errorf("failed to open file: %w", err)
108+
}
109+
defer f.Close()
110+
111+
h, err := header.Deserialize(ctx, &osFileWriterToCtx{f: f})
112+
if err != nil {
113+
return nil, fmt.Errorf("failed to deserialize header: %w", err)
114+
}
115+
116+
return h, nil
117+
}
118+
119+
func getReferencedData(h *header.Header, objectType storage.ObjectType) ([]string, error) {
33120
builds := make(map[string]struct{})
34121

35122
for _, mapping := range h.Mapping {
@@ -58,62 +145,56 @@ func getReferencedData(ctx context.Context, bucket *storage.GCPBucketStorageProv
58145
return dataReferences, nil
59146
}
60147

61-
func copyFromBucket(ctx context.Context, from *googleStorage.ObjectHandle, to *googleStorage.ObjectHandle) (bool, error) {
62-
fromAttrs, err := from.Attrs(ctx)
63-
if err != nil {
64-
return false, fmt.Errorf("failed to check if the object exists: %w", err)
148+
func localCopy(ctx context.Context, from, to *Destination) error {
149+
command := []string{
150+
"rsync",
151+
"-aH",
152+
"--whole-file",
153+
"--mkpath",
154+
"--inplace",
155+
from.Path,
156+
to.Path,
65157
}
66158

67-
var toCrc uint32
68-
if attrs, err := to.Attrs(ctx); err == nil {
69-
toCrc = attrs.CRC32C
70-
} else if !errors.Is(err, googleStorage.ErrObjectNotExist) {
71-
return false, fmt.Errorf("failed to get object attributes: %w", err)
72-
}
159+
cmd := exec.CommandContext(ctx, command[0], command[1:]...)
73160

74-
if fromAttrs.CRC32C == toCrc && fromAttrs.CRC32C != 0 {
75-
return false, nil
76-
}
77-
78-
err = gcloudCopy(ctx, from, to)
161+
output, err := cmd.CombinedOutput()
79162
if err != nil {
80-
return false, fmt.Errorf("failed to copy object: %w", err)
163+
return fmt.Errorf("failed to copy local file (%v): %w\n%s", command, err, string(output))
81164
}
82165

83-
return true, nil
166+
return nil
84167
}
85168

86-
func gcloudCopy(ctx context.Context, from, to *googleStorage.ObjectHandle) error {
87-
fromPath := fmt.Sprintf("gs://%s/%s", from.BucketName(), from.ObjectName())
88-
toPath := fmt.Sprintf("gs://%s/%s", to.BucketName(), to.ObjectName())
89-
90-
cmd := exec.CommandContext(
91-
ctx,
169+
func gcloudCopy(ctx context.Context, from, to *Destination) error {
170+
command := []string{
92171
"gcloud",
93172
"storage",
94173
"cp",
95174
"--verbosity",
96175
"error",
97-
fromPath,
98-
toPath,
99-
)
176+
from.Path,
177+
to.Path,
178+
}
179+
180+
cmd := exec.CommandContext(ctx, command[0], command[1:]...)
100181

101182
output, err := cmd.CombinedOutput()
102183
if err != nil {
103-
return fmt.Errorf("failed to copy GCS object: %w\n%s", err, string(output))
184+
return fmt.Errorf("failed to copy GCS object (%v): %w\n%s", command, err, string(output))
104185
}
105186

106187
return nil
107188
}
108189

109190
func main() {
110191
buildId := flag.String("build", "", "build id")
111-
from := flag.String("from", "", "from bucket")
112-
to := flag.String("to", "", "to bucket")
192+
from := flag.String("from", "", "from destination")
193+
to := flag.String("to", "", "to destination")
113194

114195
flag.Parse()
115196

116-
fmt.Printf("Copying build '%s' from bucket '%s' to bucket '%s'\n", *buildId, *from, *to)
197+
fmt.Printf("Copying build '%s' from '%s' to '%s'\n", *buildId, *from, *to)
117198

118199
template := storage.TemplateFiles{
119200
BuildID: *buildId,
@@ -123,16 +204,31 @@ func main() {
123204

124205
ctx := context.Background()
125206

126-
fromBucket, err := storage.NewGCPBucketStorageProvider(ctx, *from, nil)
127-
if err != nil {
128-
log.Fatalf("failed to create GCS bucket storage provider: %s", err)
129-
}
130-
131207
var filesToCopy []string
132208

133209
// Extract all files referenced by the build memfile header
134210
buildMemfileHeaderPath := template.StorageMemfileHeaderPath()
135-
dataReferences, err := getReferencedData(ctx, fromBucket, buildMemfileHeaderPath, storage.MemfileHeaderObjectType)
211+
212+
var memfileHeader *header.Header
213+
if strings.HasPrefix(*from, "gs://") {
214+
bucketName, _ := strings.CutPrefix(*from, "gs://")
215+
216+
h, err := NewHeaderFromObject(ctx, bucketName, buildMemfileHeaderPath, storage.MemfileHeaderObjectType)
217+
if err != nil {
218+
log.Fatalf("failed to create header from object: %s", err)
219+
}
220+
221+
memfileHeader = h
222+
} else {
223+
h, err := NewHeaderFromPath(ctx, *from, buildMemfileHeaderPath)
224+
if err != nil {
225+
log.Fatalf("failed to create header from path: %s", err)
226+
}
227+
228+
memfileHeader = h
229+
}
230+
231+
dataReferences, err := getReferencedData(memfileHeader, storage.MemfileHeaderObjectType)
136232
if err != nil {
137233
log.Fatalf("failed to get referenced data: %s", err)
138234
}
@@ -142,7 +238,26 @@ func main() {
142238

143239
// Extract all files referenced by the build rootfs header
144240
buildRootfsHeaderPath := template.StorageRootfsHeaderPath()
145-
dataReferences, err = getReferencedData(ctx, fromBucket, buildRootfsHeaderPath, storage.RootFSHeaderObjectType)
241+
242+
var rootfsHeader *header.Header
243+
if strings.HasPrefix(*from, "gs://") {
244+
bucketName, _ := strings.CutPrefix(*from, "gs://")
245+
h, err := NewHeaderFromObject(ctx, bucketName, buildRootfsHeaderPath, storage.RootFSHeaderObjectType)
246+
if err != nil {
247+
log.Fatalf("failed to create header from object: %s", err)
248+
}
249+
250+
rootfsHeader = h
251+
} else {
252+
h, err := NewHeaderFromPath(ctx, *from, buildRootfsHeaderPath)
253+
if err != nil {
254+
log.Fatalf("failed to create header from path: %s", err)
255+
}
256+
257+
rootfsHeader = h
258+
}
259+
260+
dataReferences, err = getReferencedData(rootfsHeader, storage.RootFSHeaderObjectType)
146261
if err != nil {
147262
log.Fatalf("failed to get referenced data: %s", err)
148263
}
@@ -175,26 +290,75 @@ func main() {
175290

176291
for _, file := range filesToCopy {
177292
errgroup.Go(func() error {
178-
fmt.Printf("+ copying '%s'\n", file)
293+
var fromDestination *Destination
294+
if strings.HasPrefix(*from, "gs://") {
295+
bucketName, _ := strings.CutPrefix(*from, "gs://")
296+
fromObject := googleStorageClient.Bucket(bucketName).Object(file)
297+
d, destErr := NewDestinationFromObject(ctx, fromObject)
298+
if destErr != nil {
299+
return fmt.Errorf("failed to create destination from object: %w", destErr)
300+
}
301+
302+
fromDestination = d
303+
} else {
304+
d, destErr := NewDestinationFromPath(*from, file)
305+
if destErr != nil {
306+
return fmt.Errorf("failed to create destination from path: %w", destErr)
307+
}
308+
309+
fromDestination = d
310+
}
311+
312+
var toDestination *Destination
313+
if strings.HasPrefix(*to, "gs://") {
314+
bucketName, _ := strings.CutPrefix(*to, "gs://")
315+
toObject := googleStorageClient.Bucket(bucketName).Object(file)
316+
d, destErr := NewDestinationFromObject(ctx, toObject)
317+
if destErr != nil {
318+
return fmt.Errorf("failed to create destination from object: %w", destErr)
319+
}
179320

180-
fromObject := googleStorageClient.Bucket(*from).Object(file)
181-
toObject := googleStorageClient.Bucket(*to).Object(file)
321+
toDestination = d
322+
} else {
323+
d, destErr := NewDestinationFromPath(*to, file)
324+
if destErr != nil {
325+
return fmt.Errorf("failed to create destination from path: %w", destErr)
326+
}
182327

183-
copied, err := copyFromBucket(ctx, fromObject, toObject)
184-
if err != nil {
185-
fmt.Fprintf(os.Stderr, "- failed to copy '%s': %s\n", file, err)
328+
toDestination = d
186329

187-
return err
330+
mkdirErr := os.MkdirAll(path.Dir(toDestination.Path), 0o755)
331+
if mkdirErr != nil {
332+
return fmt.Errorf("failed to create directory: %w", mkdirErr)
333+
}
188334
}
189335

190-
done.Add(1)
336+
fmt.Printf("+ copying '%s' to '%s'\n", fromDestination.Path, toDestination.Path)
337+
338+
if fromDestination.CRC == toDestination.CRC && fromDestination.CRC != 0 {
339+
fmt.Printf("-> [%d/%d] '%s' already exists, skipping\n", done.Load(), len(filesToCopy), toDestination.Path)
191340

192-
if copied {
193-
fmt.Printf("-> [%d/%d] '%s' copied\n", done.Load(), len(filesToCopy), file)
341+
done.Add(1)
342+
343+
return nil
344+
}
345+
346+
if fromDestination.isLocal && toDestination.isLocal {
347+
err := localCopy(ctx, fromDestination, toDestination)
348+
if err != nil {
349+
return fmt.Errorf("failed to copy local file: %w", err)
350+
}
194351
} else {
195-
fmt.Printf("-> [%d/%d] '%s' already exists, skipping\n", done.Load(), len(filesToCopy), file)
352+
err := gcloudCopy(ctx, fromDestination, toDestination)
353+
if err != nil {
354+
return fmt.Errorf("failed to copy GCS object: %w", err)
355+
}
196356
}
197357

358+
done.Add(1)
359+
360+
fmt.Printf("-> [%d/%d] '%s' copied\n", done.Load(), len(filesToCopy), toDestination.Path)
361+
198362
return nil
199363
})
200364
}
@@ -203,5 +367,5 @@ func main() {
203367
log.Fatalf("failed to copy files: %s", err)
204368
}
205369

206-
fmt.Printf("Build '%s' copied to bucket '%s'\n", *buildId, *to)
370+
fmt.Printf("Build '%s' copied to '%s'\n", *buildId, *to)
207371
}

0 commit comments

Comments
 (0)