Skip to content

Commit 9eeada9

Browse files
committed
Skip shared Firecracker standby compression
1 parent 2b58e51 commit 9eeada9

7 files changed

Lines changed: 276 additions & 4 deletions

lib/instances/manager.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ type manager struct {
131131
compressionMu sync.Mutex
132132
compressionJobs map[string]*compressionJob
133133
compressionTimerFactory func(time.Duration) compressionTimer
134+
sharingInspector func(string) (snapshotMemorySharing, error)
134135
nativeCodecMu sync.Mutex
135136
nativeCodecPaths map[string]string
136137
imageUsageRecorder ImageUsageRecorder

lib/instances/metrics.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,13 @@ const (
3030
snapshotCompressionResultFailed snapshotCompressionResult = "failed"
3131
)
3232

33+
type snapshotCompressionSkipReason string
34+
35+
const (
36+
snapshotCompressionSkipReasonNone snapshotCompressionSkipReason = "none"
37+
snapshotCompressionSkipReasonSharedExtents snapshotCompressionSkipReason = "shared_extents"
38+
)
39+
3340
type snapshotCompressionWaitOutcome string
3441

3542
const (
@@ -569,14 +576,15 @@ func snapshotCompressionAttributes(hvType hypervisor.Type, algorithm snapshotsto
569576
return attrs
570577
}
571578

572-
func (m *manager) recordSnapshotCompressionJob(ctx context.Context, target compressionTarget, result snapshotCompressionResult, compressionStart *time.Time, uncompressedSize, compressedSize int64) {
579+
func (m *manager) recordSnapshotCompressionJob(ctx context.Context, target compressionTarget, result snapshotCompressionResult, skipReason snapshotCompressionSkipReason, compressionStart *time.Time, uncompressedSize, compressedSize int64) {
573580
if m.metrics == nil {
574581
return
575582
}
576583

577584
attrs := snapshotCompressionAttributes(target.HypervisorType, target.Policy.Algorithm, target.Source)
578585
attrsWithResult := append([]attribute.KeyValue{}, attrs...)
579586
attrsWithResult = append(attrsWithResult, attribute.String("result", string(result)))
587+
attrsWithResult = append(attrsWithResult, attribute.String("reason", string(skipReason)))
580588

581589
m.metrics.snapshotCompressionJobsTotal.Add(ctx, 1, metric.WithAttributes(attrsWithResult...))
582590
if compressionStart != nil {

lib/instances/metrics_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,8 @@ func TestSnapshotCompressionMetrics_RecordAndObserve(t *testing.T) {
6161

6262
target := m.compressionJobs["job-1"].target
6363
startedAt := time.Now().Add(-2 * time.Second)
64-
m.recordSnapshotCompressionJob(t.Context(), target, snapshotCompressionResultSuccess, &startedAt, 1024, 256)
65-
m.recordSnapshotCompressionJob(t.Context(), m.compressionJobs["job-2"].target, snapshotCompressionResultSkipped, nil, 0, 0)
64+
m.recordSnapshotCompressionJob(t.Context(), target, snapshotCompressionResultSuccess, snapshotCompressionSkipReasonNone, &startedAt, 1024, 256)
65+
m.recordSnapshotCompressionJob(t.Context(), m.compressionJobs["job-2"].target, snapshotCompressionResultSkipped, snapshotCompressionSkipReasonSharedExtents, nil, 0, 0)
6666
m.recordSnapshotCompressionWait(t.Context(), m.compressionJobs["job-2"].target, snapshotCompressionWaitOutcomeSkipped, time.Now().Add(-1500*time.Millisecond))
6767
m.recordSnapshotCodecFallback(t.Context(), snapshotstore.SnapshotCompressionAlgorithmLz4, snapshotCodecOperationCompress, snapshotCodecFallbackReasonMissingBinary)
6868
m.recordSnapshotRestoreMemoryPrepare(t.Context(), hypervisor.TypeCloudHypervisor, snapshotMemoryPreparePathRaw, snapshotCompressionResultSuccess, time.Now().Add(-250*time.Millisecond))
@@ -96,11 +96,13 @@ func TestSnapshotCompressionMetrics_RecordAndObserve(t *testing.T) {
9696
assert.Equal(t, "cloud-hypervisor", metricLabel(t, point.Attributes, "hypervisor"))
9797
assert.Equal(t, "lz4", metricLabel(t, point.Attributes, "algorithm"))
9898
assert.Equal(t, "standby", metricLabel(t, point.Attributes, "source"))
99+
assert.Equal(t, "none", metricLabel(t, point.Attributes, "reason"))
99100
case "skipped":
100101
assert.Equal(t, int64(1), point.Value)
101102
assert.Equal(t, "qemu", metricLabel(t, point.Attributes, "hypervisor"))
102103
assert.Equal(t, "zstd", metricLabel(t, point.Attributes, "algorithm"))
103104
assert.Equal(t, "standby", metricLabel(t, point.Attributes, "source"))
105+
assert.Equal(t, "shared_extents", metricLabel(t, point.Attributes, "reason"))
104106
default:
105107
t.Fatalf("unexpected compression job result datapoint: %s", metricLabel(t, point.Attributes, "result"))
106108
}
@@ -546,6 +548,7 @@ func TestEnsureSnapshotMemoryReadySkipsPendingCompressionWithoutPreemptionMetric
546548
require.True(t, ok)
547549
require.Len(t, jobs.DataPoints, 1)
548550
assert.Equal(t, "skipped", metricLabel(t, jobs.DataPoints[0].Attributes, "result"))
551+
assert.Equal(t, "none", metricLabel(t, jobs.DataPoints[0].Attributes, "reason"))
549552

550553
waitMetric := findMetric(t, rm, "hypeman_snapshot_compression_wait_duration_seconds")
551554
waitDurations, ok := waitMetric.Data.(metricdata.Histogram[float64])

lib/instances/snapshot_compression.go

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,12 @@ type nativeCodecRuntime struct {
8585
commandContext func(ctx context.Context, name string, arg ...string) *exec.Cmd
8686
}
8787

88+
type snapshotMemorySharing struct {
89+
SharedBytes int64
90+
PrivateBytes int64
91+
Unknown bool
92+
}
93+
8894
func (r nativeCodecRuntime) lookPathFunc() func(string) (string, error) {
8995
if r.lookPath != nil {
9096
return r.lookPath
@@ -511,6 +517,35 @@ func runWithNativeFallback(ctx context.Context, runtime nativeCodecRuntime, algo
511517
return nil
512518
}
513519

520+
func (m *manager) inspectSnapshotMemorySharing(rawPath string) (snapshotMemorySharing, error) {
521+
if m != nil && m.sharingInspector != nil {
522+
return m.sharingInspector(rawPath)
523+
}
524+
return inspectSnapshotMemorySharing(rawPath)
525+
}
526+
527+
func (m *manager) shouldSkipSnapshotCompressionForSharedMemory(ctx context.Context, target compressionTarget, rawPath string) (snapshotMemorySharing, bool) {
528+
if target.HypervisorType != hypervisor.TypeFirecracker || target.Source != snapshotCompressionSourceStandby {
529+
return snapshotMemorySharing{}, false
530+
}
531+
532+
sharing, err := m.inspectSnapshotMemorySharing(rawPath)
533+
if err != nil {
534+
logger.FromContext(ctx).WarnContext(ctx, "failed to inspect snapshot memory sharing; continuing compression",
535+
"owner_id", target.OwnerID,
536+
"snapshot_id", target.SnapshotID,
537+
"snapshot_dir", target.SnapshotDir,
538+
"raw_path", rawPath,
539+
"error", err,
540+
)
541+
return snapshotMemorySharing{}, false
542+
}
543+
if sharing.Unknown || sharing.SharedBytes <= 0 {
544+
return sharing, false
545+
}
546+
return sharing, true
547+
}
548+
514549
func (m *manager) startCompressionJob(ctx context.Context, target compressionTarget) {
515550
if target.Key == "" || !target.Policy.Enabled {
516551
return
@@ -539,6 +574,7 @@ func (m *manager) startCompressionJob(ctx context.Context, target compressionTar
539574

540575
go func() {
541576
result := snapshotCompressionResultSuccess
577+
skipReason := snapshotCompressionSkipReasonNone
542578
var uncompressedSize int64
543579
var compressedSize int64
544580
var spanErr error
@@ -547,7 +583,7 @@ func (m *manager) startCompressionJob(ctx context.Context, target compressionTar
547583
var compressionStart *time.Time
548584

549585
defer func() {
550-
m.recordSnapshotCompressionJob(metricsCtx, target, result, compressionStart, uncompressedSize, compressedSize)
586+
m.recordSnapshotCompressionJob(metricsCtx, target, result, skipReason, compressionStart, uncompressedSize, compressedSize)
551587
if target.Source == snapshotCompressionSourceStandby && target.SnapshotID == "" {
552588
if err := m.clearPendingStandbyCompression(context.Background(), target.OwnerID); err != nil && !errors.Is(err, ErrNotFound) {
553589
log.WarnContext(context.Background(), "failed to clear pending standby compression plan after job completion", "instance_id", target.OwnerID, "error", err)
@@ -667,6 +703,20 @@ func (m *manager) startCompressionJob(ctx context.Context, target compressionTar
667703
}
668704
return
669705
}
706+
if sharing, skip := m.shouldSkipSnapshotCompressionForSharedMemory(compressionCtx, target, rawPath); skip {
707+
result = snapshotCompressionResultSkipped
708+
skipReason = snapshotCompressionSkipReasonSharedExtents
709+
log.InfoContext(compressionCtx, "skipping standby snapshot compression because memory file has shared extents",
710+
"owner_id", target.OwnerID,
711+
"snapshot_id", target.SnapshotID,
712+
"snapshot_dir", target.SnapshotDir,
713+
"raw_path", rawPath,
714+
"hypervisor", string(target.HypervisorType),
715+
"shared_bytes", sharing.SharedBytes,
716+
"private_bytes", sharing.PrivateBytes,
717+
)
718+
return
719+
}
670720

671721
var err error
672722
uncompressedSize, compressedSize, err = compressSnapshotMemoryFileWithRuntime(jobCtx, nativeCodecRuntime{manager: m}, rawPath, target.Policy)

lib/instances/snapshot_compression_test.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import (
1414
snapshotstore "github.com/kernel/hypeman/lib/snapshot"
1515
"github.com/stretchr/testify/assert"
1616
"github.com/stretchr/testify/require"
17+
otelmetric "go.opentelemetry.io/otel/sdk/metric"
18+
"go.opentelemetry.io/otel/sdk/metric/metricdata"
1719
)
1820

1921
func TestNormalizeCompressionConfig(t *testing.T) {
@@ -433,6 +435,101 @@ func newSnapshotCompressionTestManager(t *testing.T) *manager {
433435
}
434436
}
435437

438+
func TestStartCompressionJobSkipsFirecrackerStandbySharedMemory(t *testing.T) {
439+
mgr := newSnapshotCompressionTestManager(t)
440+
reader := otelmetric.NewManualReader()
441+
provider := otelmetric.NewMeterProvider(otelmetric.WithReader(reader))
442+
metrics, err := newInstanceMetrics(provider.Meter("test"), nil, mgr)
443+
require.NoError(t, err)
444+
mgr.metrics = metrics
445+
446+
snapshotDir := t.TempDir()
447+
rawPath := filepath.Join(snapshotDir, "memory")
448+
require.NoError(t, os.WriteFile(rawPath, []byte("shared firecracker standby memory"), 0o644))
449+
450+
mgr.sharingInspector = func(path string) (snapshotMemorySharing, error) {
451+
assert.Equal(t, rawPath, path)
452+
return snapshotMemorySharing{SharedBytes: 1024, PrivateBytes: 512}, nil
453+
}
454+
455+
target := compressionTarget{
456+
Key: "instance:shared-firecracker",
457+
OwnerID: "shared-firecracker",
458+
SnapshotDir: snapshotDir,
459+
HypervisorType: hypervisor.TypeFirecracker,
460+
Source: snapshotCompressionSourceStandby,
461+
Policy: snapshotstore.SnapshotCompressionConfig{
462+
Enabled: true,
463+
Algorithm: snapshotstore.SnapshotCompressionAlgorithmZstd,
464+
Level: intPtr(1),
465+
},
466+
}
467+
468+
mgr.startCompressionJob(context.Background(), target)
469+
470+
require.Eventually(t, func() bool {
471+
mgr.compressionMu.Lock()
472+
defer mgr.compressionMu.Unlock()
473+
_, ok := mgr.compressionJobs[target.Key]
474+
return !ok
475+
}, time.Second, 10*time.Millisecond)
476+
477+
_, err = os.Stat(rawPath)
478+
require.NoError(t, err, "shared raw memory should remain uncompressed")
479+
_, err = os.Stat(rawPath + ".zst")
480+
require.Error(t, err)
481+
assert.True(t, os.IsNotExist(err))
482+
483+
var rm metricdata.ResourceMetrics
484+
require.NoError(t, reader.Collect(t.Context(), &rm))
485+
jobsMetric := findMetric(t, rm, "hypeman_snapshot_compression_jobs_total")
486+
jobs, ok := jobsMetric.Data.(metricdata.Sum[int64])
487+
require.True(t, ok)
488+
require.Len(t, jobs.DataPoints, 1)
489+
assert.Equal(t, "skipped", metricLabel(t, jobs.DataPoints[0].Attributes, "result"))
490+
assert.Equal(t, "shared_extents", metricLabel(t, jobs.DataPoints[0].Attributes, "reason"))
491+
}
492+
493+
func TestStartCompressionJobDoesNotSkipNonFirecrackerSharedMemory(t *testing.T) {
494+
mgr := newSnapshotCompressionTestManager(t)
495+
496+
snapshotDir := t.TempDir()
497+
rawPath := filepath.Join(snapshotDir, "memory")
498+
require.NoError(t, os.WriteFile(rawPath, []byte("cloud hypervisor memory"), 0o644))
499+
500+
mgr.sharingInspector = func(string) (snapshotMemorySharing, error) {
501+
return snapshotMemorySharing{SharedBytes: 1024}, nil
502+
}
503+
504+
target := compressionTarget{
505+
Key: "instance:shared-cloud-hypervisor",
506+
OwnerID: "shared-cloud-hypervisor",
507+
SnapshotDir: snapshotDir,
508+
HypervisorType: hypervisor.TypeCloudHypervisor,
509+
Source: snapshotCompressionSourceStandby,
510+
Policy: snapshotstore.SnapshotCompressionConfig{
511+
Enabled: true,
512+
Algorithm: snapshotstore.SnapshotCompressionAlgorithmZstd,
513+
Level: intPtr(1),
514+
},
515+
}
516+
517+
mgr.startCompressionJob(context.Background(), target)
518+
519+
require.Eventually(t, func() bool {
520+
mgr.compressionMu.Lock()
521+
defer mgr.compressionMu.Unlock()
522+
_, ok := mgr.compressionJobs[target.Key]
523+
return !ok
524+
}, time.Second, 10*time.Millisecond)
525+
526+
_, err := os.Stat(rawPath)
527+
require.Error(t, err)
528+
assert.True(t, os.IsNotExist(err))
529+
_, _, compressed := findCompressedSnapshotMemoryFile(snapshotDir)
530+
assert.True(t, compressed)
531+
}
532+
436533
func TestStartCompressionJobDelayedCancellationRecordsSkipped(t *testing.T) {
437534
t.Parallel()
438535

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
//go:build linux
2+
3+
package instances
4+
5+
import (
6+
"errors"
7+
"fmt"
8+
"os"
9+
"unsafe"
10+
11+
"golang.org/x/sys/unix"
12+
)
13+
14+
const (
15+
snapshotMemoryFsIocFiemap = 0xC020660B
16+
snapshotMemoryFiemapFlagSync = 0x00000001
17+
snapshotMemoryFiemapExtentLast = 0x00000001
18+
snapshotMemoryFiemapExtentShared = 0x00002000
19+
snapshotMemoryMaxFiemapExtents = 512
20+
)
21+
22+
type snapshotMemoryFiemapExtent struct {
23+
Logical uint64
24+
Physical uint64
25+
Length uint64
26+
Reserved64 [2]uint64
27+
Flags uint32
28+
Reserved [3]uint32
29+
}
30+
31+
type snapshotMemoryFiemap struct {
32+
Start uint64
33+
Length uint64
34+
Flags uint32
35+
MappedExtents uint32
36+
ExtentCount uint32
37+
Reserved uint32
38+
Extents [snapshotMemoryMaxFiemapExtents]snapshotMemoryFiemapExtent
39+
}
40+
41+
func inspectSnapshotMemorySharing(path string) (snapshotMemorySharing, error) {
42+
file, err := os.Open(path)
43+
if err != nil {
44+
return snapshotMemorySharing{}, fmt.Errorf("open snapshot memory: %w", err)
45+
}
46+
defer file.Close()
47+
48+
info, err := file.Stat()
49+
if err != nil {
50+
return snapshotMemorySharing{}, fmt.Errorf("stat snapshot memory: %w", err)
51+
}
52+
if info.Size() == 0 {
53+
return snapshotMemorySharing{}, nil
54+
}
55+
56+
var out snapshotMemorySharing
57+
start := uint64(0)
58+
for {
59+
mapping := snapshotMemoryFiemap{
60+
Start: start,
61+
Length: ^uint64(0),
62+
Flags: snapshotMemoryFiemapFlagSync,
63+
ExtentCount: snapshotMemoryMaxFiemapExtents,
64+
}
65+
66+
_, _, errno := unix.Syscall(unix.SYS_IOCTL, file.Fd(), uintptr(snapshotMemoryFsIocFiemap), uintptr(unsafe.Pointer(&mapping)))
67+
if errno != 0 {
68+
if isFiemapUnsupported(errno) {
69+
return snapshotMemorySharing{Unknown: true}, nil
70+
}
71+
return snapshotMemorySharing{}, fmt.Errorf("fiemap snapshot memory: %w", errno)
72+
}
73+
if mapping.MappedExtents == 0 {
74+
return out, nil
75+
}
76+
77+
last := false
78+
nextStart := start
79+
for i := uint32(0); i < mapping.MappedExtents; i++ {
80+
extent := mapping.Extents[i]
81+
length := int64(extent.Length)
82+
if extent.Flags&snapshotMemoryFiemapExtentShared != 0 {
83+
out.SharedBytes += length
84+
} else {
85+
out.PrivateBytes += length
86+
}
87+
nextStart = extent.Logical + extent.Length
88+
if extent.Flags&snapshotMemoryFiemapExtentLast != 0 {
89+
last = true
90+
}
91+
}
92+
if last || nextStart <= start {
93+
return out, nil
94+
}
95+
start = nextStart
96+
}
97+
}
98+
99+
func isFiemapUnsupported(err error) bool {
100+
return errors.Is(err, unix.ENOTTY) ||
101+
errors.Is(err, unix.EOPNOTSUPP) ||
102+
errors.Is(err, unix.ENOTSUP) ||
103+
errors.Is(err, unix.EINVAL) ||
104+
errors.Is(err, unix.ENOSYS) ||
105+
errors.Is(err, unix.EPERM)
106+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
//go:build !linux
2+
3+
package instances
4+
5+
func inspectSnapshotMemorySharing(string) (snapshotMemorySharing, error) {
6+
return snapshotMemorySharing{Unknown: true}, nil
7+
}

0 commit comments

Comments
 (0)