Skip to content

Commit 048ae83

Browse files
committed
fix(compression): PR #2034 review — size, timer, validate, CI
- peerserver/seekable + localDiff: use logical cache.Size(), not on-disk FileSize() (8x-inflated formula broke V3/V4 P2P resume). - storage_google.StoreFile: one outer WriteFromFileSystem timer covering all branches; drop the shadowed OneShot; tag with compression.type/level. - Privatize ResolveCompressConfig into sandbox; validate frame size against the real per-file block size from the snapshot diff header. - CI: add lz4 matrix; propagate COMPRESS_* to the base-template build via .env.test so the fixture matches the services.
1 parent 45ec070 commit 048ae83

8 files changed

Lines changed: 144 additions & 88 deletions

File tree

.github/actions/build-sandbox-template/action.yml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,24 @@
11
name: "Build Sandbox Template"
22
description: "Builds the Firecracker sandbox template."
33

4+
inputs:
5+
compress_enabled:
6+
description: "Enable compression (true/false)"
7+
required: false
8+
default: "false"
9+
compress_type:
10+
description: "Compression type (zstd, lz4)"
11+
required: false
12+
default: ""
13+
compress_level:
14+
description: "Compression level (zstd: 1=fastest, 2=default; lz4: 0)"
15+
required: false
16+
default: ""
17+
compress_workers:
18+
description: "Number of frame encode workers"
19+
required: false
20+
default: ""
21+
422
runs:
523
using: "composite"
624
steps:
@@ -9,6 +27,10 @@ runs:
927
TEMPLATE_ID: "2j6ly824owf4awgai1xo"
1028
KERNEL_VERSION: "vmlinux-6.1.158"
1129
FIRECRACKER_VERSION: "v1.14.1_458ca91"
30+
COMPRESS_ENABLED: ${{ inputs.compress_enabled }}
31+
COMPRESS_TYPE: ${{ inputs.compress_type }}
32+
COMPRESS_LEVEL: ${{ inputs.compress_level }}
33+
COMPRESS_FRAME_ENCODE_WORKERS: ${{ inputs.compress_workers }}
1234
run: |
1335
# Generate an unique build ID for the template for this run
1436
export BUILD_ID=$(uuidgen)
@@ -17,6 +39,10 @@ runs:
1739
1840
echo "TESTS_SANDBOX_TEMPLATE_ID=${TEMPLATE_ID}" >> .env.test
1941
echo "TESTS_SANDBOX_BUILD_ID=${BUILD_ID}" >> .env.test
42+
echo "COMPRESS_ENABLED=${COMPRESS_ENABLED}" >> .env.test
43+
echo "COMPRESS_TYPE=${COMPRESS_TYPE}" >> .env.test
44+
echo "COMPRESS_LEVEL=${COMPRESS_LEVEL}" >> .env.test
45+
echo "COMPRESS_FRAME_ENCODE_WORKERS=${COMPRESS_FRAME_ENCODE_WORKERS}" >> .env.test
2046
2147
sudo -E make -C packages/orchestrator build-template \
2248
ARTIFACTS_REGISTRY_PROVIDER=Local \

.github/workflows/integration_tests.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ jobs:
3030
compress_type: "zstd"
3131
compress_level: "1"
3232
compress_workers: "8"
33+
- name: lz4
34+
compress_enabled: "true"
35+
compress_type: "lz4"
36+
compress_level: "0"
37+
compress_workers: "8"
3338
env:
3439
# Surfaced as env so upload steps can gate on presence (skipped on fork PRs).
3540
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
@@ -46,6 +51,11 @@ jobs:
4651

4752
- name: Build Template
4853
uses: ./.github/actions/build-sandbox-template
54+
with:
55+
compress_enabled: ${{ matrix.compress_enabled }}
56+
compress_type: ${{ matrix.compress_type }}
57+
compress_level: ${{ matrix.compress_level }}
58+
compress_workers: ${{ matrix.compress_workers }}
4959

5060
- name: Start Services
5161
uses: ./.github/actions/start-services

packages/orchestrator/Makefile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,10 @@ build-template: fetch-busybox
135135
GCP_PROJECT_ID=$(GCP_PROJECT_ID) \
136136
GCP_DOCKER_REPOSITORY_NAME=$(GCP_DOCKER_REPOSITORY_NAME) \
137137
GCP_REGION=$(GCP_REGION) \
138+
COMPRESS_ENABLED=$(COMPRESS_ENABLED) \
139+
COMPRESS_TYPE=$(COMPRESS_TYPE) \
140+
COMPRESS_LEVEL=$(COMPRESS_LEVEL) \
141+
COMPRESS_FRAME_ENCODE_WORKERS=$(COMPRESS_FRAME_ENCODE_WORKERS) \
138142
ENVIRONMENT=local \
139143
go run cmd/create-build/main.go \
140144
-template $(TEMPLATE_ID) \

packages/orchestrator/pkg/sandbox/build/local_diff.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ func (b *localDiff) Slice(_ context.Context, off, length int64, _ *storage.Frame
124124
}
125125

126126
func (b *localDiff) Size(_ context.Context) (int64, error) {
127-
return b.FileSize()
127+
return b.cache.Size()
128128
}
129129

130130
func (b *localDiff) FileSize() (int64, error) {

packages/orchestrator/pkg/sandbox/build_upload.go

Lines changed: 88 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77

88
"github.com/google/uuid"
9+
"github.com/launchdarkly/go-sdk-common/v3/ldcontext"
910

1011
"github.com/e2b-dev/infra/packages/orchestrator/pkg/sandbox/build"
1112
"github.com/e2b-dev/infra/packages/shared/pkg/featureflags"
@@ -36,14 +37,23 @@ func NewUpload(
3637
useCase string,
3738
objectMetadata storage.ObjectMetadata,
3839
) (*Upload, error) {
40+
mem, err := resolveCompressConfig(ctx, cfg, ff, storage.MemfileName, snap.MemfileDiffHeader.Metadata.BlockSize, useCase)
41+
if err != nil {
42+
return nil, fmt.Errorf("resolve memfile compress config: %w", err)
43+
}
44+
root, err := resolveCompressConfig(ctx, cfg, ff, storage.RootfsName, snap.RootfsDiffHeader.Metadata.BlockSize, useCase)
45+
if err != nil {
46+
return nil, fmt.Errorf("resolve rootfs compress config: %w", err)
47+
}
48+
3949
u := &Upload{
4050
buildID: snap.BuildID,
4151
snap: snap,
4252
paths: storage.Paths{BuildID: snap.BuildID.String()},
4353
uploads: uploads,
4454
store: store,
45-
mem: storage.ResolveCompressConfig(ctx, cfg, ff, storage.MemfileName, useCase),
46-
root: storage.ResolveCompressConfig(ctx, cfg, ff, storage.RootfsName, useCase),
55+
mem: mem,
56+
root: root,
4757
objectMetadata: objectMetadata,
4858
}
4959

@@ -97,3 +107,79 @@ func (u *Upload) publish(ctx context.Context, t build.DiffType, h *headers.Heade
97107

98108
return nil
99109
}
110+
111+
// resolveCompressConfig returns the effective compression config for a given
112+
// file type and use case. Feature flags override the base config when active.
113+
// Returns zero-value CompressConfig when compression is disabled.
114+
//
115+
// fileType and useCase are added to the LD evaluation context so that
116+
// LaunchDarkly targeting rules can differentiate (e.g. compress memfile
117+
// but not rootfs, or compress builds but not pauses). blockSize is the
118+
// in-VM read granularity for this fileType (from the diff header) and
119+
// constrains the legal frame sizes — see validateCompressConfig.
120+
//
121+
// The resolved config is validated; an invalid env or LD-derived config
122+
// surfaces as an error so the upload fails fast rather than streaming with
123+
// a misconfigured frame size.
124+
func resolveCompressConfig(ctx context.Context, base storage.CompressConfig, ff *featureflags.Client, fileType string, blockSize uint64, useCase string) (storage.CompressConfig, error) {
125+
resolved := base
126+
127+
if ff != nil {
128+
var extra []ldcontext.Context
129+
if fileType != "" {
130+
extra = append(extra, featureflags.CompressFileTypeContext(fileType))
131+
}
132+
if useCase != "" {
133+
extra = append(extra, featureflags.CompressUseCaseContext(useCase))
134+
}
135+
ctx = featureflags.AddToContext(ctx, extra...)
136+
137+
v := ff.JSONFlag(ctx, featureflags.CompressConfigFlag).AsValueMap()
138+
139+
if v.Get("compressBuilds").BoolValue() {
140+
ct := v.Get("compressionType").StringValue()
141+
ldCfg := storage.CompressConfig{
142+
Enabled: true,
143+
Type: ct,
144+
Level: v.Get("compressionLevel").IntValue(),
145+
FrameSizeKB: v.Get("frameSizeKB").IntValue(),
146+
MinPartSizeMB: v.Get("minPartSizeMB").IntValue(),
147+
FrameEncodeWorkers: v.Get("frameEncodeWorkers").IntValue(),
148+
EncoderConcurrency: v.Get("encoderConcurrency").IntValue(),
149+
}
150+
if ldCfg.CompressionType() != storage.CompressionNone {
151+
resolved = ldCfg
152+
}
153+
}
154+
}
155+
156+
if !resolved.IsCompressionEnabled() {
157+
return storage.CompressConfig{}, nil
158+
}
159+
160+
if err := validateCompressConfig(resolved, blockSize); err != nil {
161+
return storage.CompressConfig{}, err
162+
}
163+
164+
return resolved, nil
165+
}
166+
167+
// validateCompressConfig checks that the resolved config is internally
168+
// consistent for the given block size. Frame size must be a positive multiple
169+
// of blockSize so that every block-sized read served by the chunker lies
170+
// inside one frame — otherwise Chunker.fetch fetches only the start frame and
171+
// cache.sliceDirect returns uninitialized mmap bytes for the tail.
172+
func validateCompressConfig(c storage.CompressConfig, blockSize uint64) error {
173+
fs := c.FrameSize()
174+
if fs <= 0 {
175+
return fmt.Errorf("frame size must be positive, got %d KB", c.FrameSizeKB)
176+
}
177+
if blockSize == 0 {
178+
return errors.New("block size must be positive")
179+
}
180+
if uint64(fs)%blockSize != 0 {
181+
return fmt.Errorf("frame size (%d) must be a multiple of block size (%d)", fs, blockSize)
182+
}
183+
184+
return nil
185+
}

packages/orchestrator/pkg/sandbox/template/peerserver/seekable.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ type seekableSource struct {
1818
diff build.Diff
1919
}
2020

21-
func (f *seekableSource) Size(_ context.Context) (int64, error) {
22-
return f.diff.FileSize()
21+
func (f *seekableSource) Size(ctx context.Context) (int64, error) {
22+
return f.diff.Size(ctx)
2323
}
2424

2525
func (f *seekableSource) Exists(_ context.Context) (bool, error) {
Lines changed: 2 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,5 @@
11
package storage
22

3-
import (
4-
"context"
5-
"fmt"
6-
7-
"github.com/launchdarkly/go-sdk-common/v3/ldcontext"
8-
9-
"github.com/e2b-dev/infra/packages/shared/pkg/featureflags"
10-
)
11-
123
const (
134
// DefaultCompressFrameSize is the default uncompressed size of each
145
// compression frame (2 MiB). Overridable via CompressConfig.FrameSizeKB.
@@ -30,8 +21,8 @@ const (
3021
)
3122

3223
// CompressConfig is the base compression configuration, loaded from environment
33-
// variables at startup. Feature flags can override individual fields at runtime
34-
// via ResolveCompressConfig. Zero value means compression disabled.
24+
// variables at startup. Feature flags may override individual fields at runtime
25+
// at the upload boundary. Zero value means compression disabled.
3526
type CompressConfig struct {
3627
Enabled bool `env:"COMPRESS_ENABLED" envDefault:"false"`
3728
Type string `env:"COMPRESS_TYPE" envDefault:""`
@@ -70,63 +61,3 @@ func (c CompressConfig) MinPartSize() int64 {
7061
func (c CompressConfig) IsCompressionEnabled() bool {
7162
return c.Enabled && c.CompressionType() != CompressionNone
7263
}
73-
74-
// Validate checks that the config is internally consistent.
75-
func (c CompressConfig) Validate() error {
76-
if !c.IsCompressionEnabled() {
77-
return nil
78-
}
79-
80-
fs := c.FrameSize()
81-
if fs <= 0 {
82-
return fmt.Errorf("frame size must be positive, got %d KB", c.FrameSizeKB)
83-
}
84-
if MemoryChunkSize%fs != 0 && fs%MemoryChunkSize != 0 {
85-
return fmt.Errorf("frame size (%d) must be a divisor or multiple of MemoryChunkSize (%d)", fs, MemoryChunkSize)
86-
}
87-
88-
return nil
89-
}
90-
91-
// ResolveCompressConfig returns the effective compression config for a given
92-
// file type and use case. Feature flags override the base config when active.
93-
// Returns zero-value CompressConfig when compression is disabled.
94-
//
95-
// fileType and useCase are added to the LD evaluation context so that
96-
// LaunchDarkly targeting rules can differentiate (e.g. compress memfile
97-
// but not rootfs, or compress builds but not pauses).
98-
func ResolveCompressConfig(ctx context.Context, base CompressConfig, ff *featureflags.Client, fileType, useCase string) CompressConfig {
99-
if ff != nil {
100-
var extra []ldcontext.Context
101-
if fileType != "" {
102-
extra = append(extra, featureflags.CompressFileTypeContext(fileType))
103-
}
104-
if useCase != "" {
105-
extra = append(extra, featureflags.CompressUseCaseContext(useCase))
106-
}
107-
ctx = featureflags.AddToContext(ctx, extra...)
108-
109-
v := ff.JSONFlag(ctx, featureflags.CompressConfigFlag).AsValueMap()
110-
111-
if v.Get("compressBuilds").BoolValue() {
112-
ct := v.Get("compressionType").StringValue()
113-
if parseCompressionType(ct) != CompressionNone {
114-
return CompressConfig{
115-
Enabled: true,
116-
Type: ct,
117-
Level: v.Get("compressionLevel").IntValue(),
118-
FrameSizeKB: v.Get("frameSizeKB").IntValue(),
119-
MinPartSizeMB: v.Get("minPartSizeMB").IntValue(),
120-
FrameEncodeWorkers: v.Get("frameEncodeWorkers").IntValue(),
121-
EncoderConcurrency: v.Get("encoderConcurrency").IntValue(),
122-
}
123-
}
124-
}
125-
}
126-
127-
if !base.IsCompressionEnabled() {
128-
return CompressConfig{}
129-
}
130-
131-
return base
132-
}

packages/shared/pkg/storage/storage_google.go

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,11 @@ const (
4646
defaultGCSEnableDirectPath = false
4747
gcloudDefaultUploadConcurrency = 16
4848

49-
gcsOperationAttr = "operation"
50-
gcsOperationAttrWrite = "Write"
51-
gcsOperationAttrWriteFromFileSystem = "WriteFromFileSystem"
52-
gcsOperationAttrWriteFromFileSystemOneShot = "WriteFromFileSystemOneShot"
53-
gcsOperationAttrWriteTo = "WriteTo"
54-
gcsOperationAttrSize = "Size"
49+
gcsOperationAttr = "operation"
50+
gcsOperationAttrWrite = "Write"
51+
gcsOperationAttrWriteFromFileSystem = "WriteFromFileSystem"
52+
gcsOperationAttrWriteTo = "WriteTo"
53+
gcsOperationAttrSize = "Size"
5554
// gcsOperationAttrReadAt tags GCS read timer metrics for OpenRangeReader
5655
// (the method was renamed from ReadAt; value kept for dashboard compatibility).
5756
gcsOperationAttrReadAt = "ReadAt"
@@ -383,8 +382,14 @@ func (o *gcpObject) StoreFile(ctx context.Context, path string, opts ...PutOptio
383382
return nil, [32]byte{}, fmt.Errorf("failed to get file size: %w", err)
384383
}
385384

385+
cfg := CompressConfigFromOpts(putOpts)
386+
387+
// Tag the upload timer with the compression mode so dashboards can split
388+
// duration/throughput by codec and level. Type is "none" when disabled.
386389
timer := googleWriteTimerFactory.Begin(
387390
attribute.String(gcsOperationAttr, gcsOperationAttrWriteFromFileSystem),
391+
attribute.String("compression.type", cfg.CompressionType().String()),
392+
attribute.Int("compression.level", cfg.Level),
388393
)
389394

390395
maxConcurrency := gcloudDefaultUploadConcurrency
@@ -403,8 +408,6 @@ func (o *gcpObject) StoreFile(ctx context.Context, path string, opts ...PutOptio
403408
maxConcurrency = o.limiter.GCloudMaxTasks(ctx)
404409
}
405410

406-
cfg := CompressConfigFromOpts(putOpts)
407-
408411
// Compressed uploads always go through the multipart compressed path,
409412
// regardless of file size.
410413
if cfg.IsCompressionEnabled() {
@@ -433,10 +436,6 @@ func (o *gcpObject) StoreFile(ctx context.Context, path string, opts ...PutOptio
433436
// If the file is too small, the overhead of writing in parallel isn't worth the effort.
434437
// Write it in one shot instead.
435438
if fileInfo.Size() < gcpMultipartUploadChunkSize {
436-
timer := googleWriteTimerFactory.Begin(
437-
attribute.String(gcsOperationAttr, gcsOperationAttrWriteFromFileSystemOneShot),
438-
)
439-
440439
data, err := os.ReadFile(path)
441440
if err != nil {
442441
timer.Failure(ctx, 0)

0 commit comments

Comments
 (0)