Skip to content

Commit d97e3e2

Browse files
committed
Skip shared Firecracker standby compression
1 parent 2b58e51 commit d97e3e2

8 files changed

Lines changed: 301 additions & 5 deletions

lib/instances/firecracker_test.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -565,6 +565,7 @@ func TestFirecrackerWarmForkChain(t *testing.T) {
565565
mgr, tmpDir := setupTestManagerForFirecrackerNoNetwork(t)
566566
ctx := context.Background()
567567
p := paths.New(tmpDir)
568+
reflinkOK := probeReflinkSupport(t, tmpDir)
568569

569570
imageManager, err := images.NewManager(p, 1, nil)
570571
require.NoError(t, err)
@@ -609,16 +610,26 @@ func TestFirecrackerWarmForkChain(t *testing.T) {
609610

610611
warm, err := mgr.ForkSnapshot(ctx, snapshot.Id, ForkSnapshotRequest{
611612
Name: "fc-warm-chain-warm",
612-
TargetState: StateRunning,
613+
TargetState: StateStandby,
613614
})
614615
require.NoError(t, err)
616+
require.Equal(t, StateStandby, warm.State)
615617
warmID := warm.Id
616618
warmDeleted := false
617619
t.Cleanup(func() {
618620
if !warmDeleted {
619621
_ = mgr.DeleteInstance(context.Background(), warmID)
620622
}
621623
})
624+
625+
if reflinkOK {
626+
requireSnapshotMemorySharedExtents(t, mgr, p.InstanceSnapshotLatest(warmID), "warm fork from snapshot")
627+
} else {
628+
t.Log("reflink unavailable; skipping shared extent assertion for warm fork")
629+
}
630+
631+
warm, err = mgr.RestoreInstance(ctx, warmID)
632+
require.NoError(t, err)
622633
warm, err = waitForInstanceState(ctx, mgr, warmID, StateRunning, integrationTestTimeout(20*time.Second))
623634
require.NoError(t, err)
624635
require.NoError(t, waitForExecAgent(ctx, mgr, warmID, 30*time.Second))
@@ -647,6 +658,19 @@ func TestFirecrackerWarmForkChain(t *testing.T) {
647658
require.NoError(t, mgr.DeleteSnapshot(ctx, snapshot.Id))
648659
}
649660

661+
func requireSnapshotMemorySharedExtents(t *testing.T, mgr *manager, snapshotDir, label string) {
662+
t.Helper()
663+
664+
rawPath, ok := findRawSnapshotMemoryFile(snapshotDir)
665+
require.True(t, ok, "%s should have a raw snapshot memory file", label)
666+
667+
sharing, err := mgr.inspectSnapshotMemorySharing(rawPath)
668+
require.NoError(t, err, "%s shared extent inspection failed", label)
669+
require.False(t, sharing.Unknown, "%s shared extent inspection returned unknown", label)
670+
require.Greater(t, sharing.SharedBytes, int64(0), "%s should have shared extents", label)
671+
t.Logf("%s memory sharing: shared=%d private=%d path=%s", label, sharing.SharedBytes, sharing.PrivateBytes, rawPath)
672+
}
673+
650674
// TestFirecrackerForkIsolation verifies CoW isolation between a firecracker
651675
// source's standby snapshot and a fork derived from it. A fork must end up
652676
// with its own mem-file inode (reflink-cloned, not hardlinked) so that

lib/instances/manager.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ type manager struct {
131131
compressionMu sync.Mutex
132132
compressionJobs map[string]*compressionJob
133133
compressionTimerFactory func(time.Duration) compressionTimer
134+
sharingInspector func(string) (snapshotMemorySharing, error)
134135
nativeCodecMu sync.Mutex
135136
nativeCodecPaths map[string]string
136137
imageUsageRecorder ImageUsageRecorder

lib/instances/metrics.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,13 @@ const (
3030
snapshotCompressionResultFailed snapshotCompressionResult = "failed"
3131
)
3232

33+
type snapshotCompressionSkipReason string
34+
35+
const (
36+
snapshotCompressionSkipReasonNone snapshotCompressionSkipReason = "none"
37+
snapshotCompressionSkipReasonSharedExtents snapshotCompressionSkipReason = "shared_extents"
38+
)
39+
3340
type snapshotCompressionWaitOutcome string
3441

3542
const (
@@ -569,14 +576,15 @@ func snapshotCompressionAttributes(hvType hypervisor.Type, algorithm snapshotsto
569576
return attrs
570577
}
571578

572-
func (m *manager) recordSnapshotCompressionJob(ctx context.Context, target compressionTarget, result snapshotCompressionResult, compressionStart *time.Time, uncompressedSize, compressedSize int64) {
579+
func (m *manager) recordSnapshotCompressionJob(ctx context.Context, target compressionTarget, result snapshotCompressionResult, skipReason snapshotCompressionSkipReason, compressionStart *time.Time, uncompressedSize, compressedSize int64) {
573580
if m.metrics == nil {
574581
return
575582
}
576583

577584
attrs := snapshotCompressionAttributes(target.HypervisorType, target.Policy.Algorithm, target.Source)
578585
attrsWithResult := append([]attribute.KeyValue{}, attrs...)
579586
attrsWithResult = append(attrsWithResult, attribute.String("result", string(result)))
587+
attrsWithResult = append(attrsWithResult, attribute.String("reason", string(skipReason)))
580588

581589
m.metrics.snapshotCompressionJobsTotal.Add(ctx, 1, metric.WithAttributes(attrsWithResult...))
582590
if compressionStart != nil {

lib/instances/metrics_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,8 @@ func TestSnapshotCompressionMetrics_RecordAndObserve(t *testing.T) {
6161

6262
target := m.compressionJobs["job-1"].target
6363
startedAt := time.Now().Add(-2 * time.Second)
64-
m.recordSnapshotCompressionJob(t.Context(), target, snapshotCompressionResultSuccess, &startedAt, 1024, 256)
65-
m.recordSnapshotCompressionJob(t.Context(), m.compressionJobs["job-2"].target, snapshotCompressionResultSkipped, nil, 0, 0)
64+
m.recordSnapshotCompressionJob(t.Context(), target, snapshotCompressionResultSuccess, snapshotCompressionSkipReasonNone, &startedAt, 1024, 256)
65+
m.recordSnapshotCompressionJob(t.Context(), m.compressionJobs["job-2"].target, snapshotCompressionResultSkipped, snapshotCompressionSkipReasonSharedExtents, nil, 0, 0)
6666
m.recordSnapshotCompressionWait(t.Context(), m.compressionJobs["job-2"].target, snapshotCompressionWaitOutcomeSkipped, time.Now().Add(-1500*time.Millisecond))
6767
m.recordSnapshotCodecFallback(t.Context(), snapshotstore.SnapshotCompressionAlgorithmLz4, snapshotCodecOperationCompress, snapshotCodecFallbackReasonMissingBinary)
6868
m.recordSnapshotRestoreMemoryPrepare(t.Context(), hypervisor.TypeCloudHypervisor, snapshotMemoryPreparePathRaw, snapshotCompressionResultSuccess, time.Now().Add(-250*time.Millisecond))
@@ -96,11 +96,13 @@ func TestSnapshotCompressionMetrics_RecordAndObserve(t *testing.T) {
9696
assert.Equal(t, "cloud-hypervisor", metricLabel(t, point.Attributes, "hypervisor"))
9797
assert.Equal(t, "lz4", metricLabel(t, point.Attributes, "algorithm"))
9898
assert.Equal(t, "standby", metricLabel(t, point.Attributes, "source"))
99+
assert.Equal(t, "none", metricLabel(t, point.Attributes, "reason"))
99100
case "skipped":
100101
assert.Equal(t, int64(1), point.Value)
101102
assert.Equal(t, "qemu", metricLabel(t, point.Attributes, "hypervisor"))
102103
assert.Equal(t, "zstd", metricLabel(t, point.Attributes, "algorithm"))
103104
assert.Equal(t, "standby", metricLabel(t, point.Attributes, "source"))
105+
assert.Equal(t, "shared_extents", metricLabel(t, point.Attributes, "reason"))
104106
default:
105107
t.Fatalf("unexpected compression job result datapoint: %s", metricLabel(t, point.Attributes, "result"))
106108
}
@@ -546,6 +548,7 @@ func TestEnsureSnapshotMemoryReadySkipsPendingCompressionWithoutPreemptionMetric
546548
require.True(t, ok)
547549
require.Len(t, jobs.DataPoints, 1)
548550
assert.Equal(t, "skipped", metricLabel(t, jobs.DataPoints[0].Attributes, "result"))
551+
assert.Equal(t, "none", metricLabel(t, jobs.DataPoints[0].Attributes, "reason"))
549552

550553
waitMetric := findMetric(t, rm, "hypeman_snapshot_compression_wait_duration_seconds")
551554
waitDurations, ok := waitMetric.Data.(metricdata.Histogram[float64])

lib/instances/snapshot_compression.go

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,12 @@ type nativeCodecRuntime struct {
8585
commandContext func(ctx context.Context, name string, arg ...string) *exec.Cmd
8686
}
8787

88+
type snapshotMemorySharing struct {
89+
SharedBytes int64
90+
PrivateBytes int64
91+
Unknown bool
92+
}
93+
8894
func (r nativeCodecRuntime) lookPathFunc() func(string) (string, error) {
8995
if r.lookPath != nil {
9096
return r.lookPath
@@ -511,6 +517,35 @@ func runWithNativeFallback(ctx context.Context, runtime nativeCodecRuntime, algo
511517
return nil
512518
}
513519

520+
func (m *manager) inspectSnapshotMemorySharing(rawPath string) (snapshotMemorySharing, error) {
521+
if m != nil && m.sharingInspector != nil {
522+
return m.sharingInspector(rawPath)
523+
}
524+
return inspectSnapshotMemorySharing(rawPath)
525+
}
526+
527+
func (m *manager) shouldSkipSnapshotCompressionForSharedMemory(ctx context.Context, target compressionTarget, rawPath string) (snapshotMemorySharing, bool) {
528+
if target.HypervisorType != hypervisor.TypeFirecracker || target.Source != snapshotCompressionSourceStandby {
529+
return snapshotMemorySharing{}, false
530+
}
531+
532+
sharing, err := m.inspectSnapshotMemorySharing(rawPath)
533+
if err != nil {
534+
logger.FromContext(ctx).WarnContext(ctx, "failed to inspect snapshot memory sharing; continuing compression",
535+
"owner_id", target.OwnerID,
536+
"snapshot_id", target.SnapshotID,
537+
"snapshot_dir", target.SnapshotDir,
538+
"raw_path", rawPath,
539+
"error", err,
540+
)
541+
return snapshotMemorySharing{}, false
542+
}
543+
if sharing.Unknown || sharing.SharedBytes <= 0 {
544+
return sharing, false
545+
}
546+
return sharing, true
547+
}
548+
514549
func (m *manager) startCompressionJob(ctx context.Context, target compressionTarget) {
515550
if target.Key == "" || !target.Policy.Enabled {
516551
return
@@ -539,6 +574,7 @@ func (m *manager) startCompressionJob(ctx context.Context, target compressionTar
539574

540575
go func() {
541576
result := snapshotCompressionResultSuccess
577+
skipReason := snapshotCompressionSkipReasonNone
542578
var uncompressedSize int64
543579
var compressedSize int64
544580
var spanErr error
@@ -547,7 +583,7 @@ func (m *manager) startCompressionJob(ctx context.Context, target compressionTar
547583
var compressionStart *time.Time
548584

549585
defer func() {
550-
m.recordSnapshotCompressionJob(metricsCtx, target, result, compressionStart, uncompressedSize, compressedSize)
586+
m.recordSnapshotCompressionJob(metricsCtx, target, result, skipReason, compressionStart, uncompressedSize, compressedSize)
551587
if target.Source == snapshotCompressionSourceStandby && target.SnapshotID == "" {
552588
if err := m.clearPendingStandbyCompression(context.Background(), target.OwnerID); err != nil && !errors.Is(err, ErrNotFound) {
553589
log.WarnContext(context.Background(), "failed to clear pending standby compression plan after job completion", "instance_id", target.OwnerID, "error", err)
@@ -667,6 +703,20 @@ func (m *manager) startCompressionJob(ctx context.Context, target compressionTar
667703
}
668704
return
669705
}
706+
if sharing, skip := m.shouldSkipSnapshotCompressionForSharedMemory(compressionCtx, target, rawPath); skip {
707+
result = snapshotCompressionResultSkipped
708+
skipReason = snapshotCompressionSkipReasonSharedExtents
709+
log.InfoContext(compressionCtx, "skipping standby snapshot compression because memory file has shared extents",
710+
"owner_id", target.OwnerID,
711+
"snapshot_id", target.SnapshotID,
712+
"snapshot_dir", target.SnapshotDir,
713+
"raw_path", rawPath,
714+
"hypervisor", string(target.HypervisorType),
715+
"shared_bytes", sharing.SharedBytes,
716+
"private_bytes", sharing.PrivateBytes,
717+
)
718+
return
719+
}
670720

671721
var err error
672722
uncompressedSize, compressedSize, err = compressSnapshotMemoryFileWithRuntime(jobCtx, nativeCodecRuntime{manager: m}, rawPath, target.Policy)

lib/instances/snapshot_compression_test.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import (
1414
snapshotstore "github.com/kernel/hypeman/lib/snapshot"
1515
"github.com/stretchr/testify/assert"
1616
"github.com/stretchr/testify/require"
17+
otelmetric "go.opentelemetry.io/otel/sdk/metric"
18+
"go.opentelemetry.io/otel/sdk/metric/metricdata"
1719
)
1820

1921
func TestNormalizeCompressionConfig(t *testing.T) {
@@ -433,6 +435,101 @@ func newSnapshotCompressionTestManager(t *testing.T) *manager {
433435
}
434436
}
435437

438+
func TestStartCompressionJobSkipsFirecrackerStandbySharedMemory(t *testing.T) {
439+
mgr := newSnapshotCompressionTestManager(t)
440+
reader := otelmetric.NewManualReader()
441+
provider := otelmetric.NewMeterProvider(otelmetric.WithReader(reader))
442+
metrics, err := newInstanceMetrics(provider.Meter("test"), nil, mgr)
443+
require.NoError(t, err)
444+
mgr.metrics = metrics
445+
446+
snapshotDir := t.TempDir()
447+
rawPath := filepath.Join(snapshotDir, "memory")
448+
require.NoError(t, os.WriteFile(rawPath, []byte("shared firecracker standby memory"), 0o644))
449+
450+
mgr.sharingInspector = func(path string) (snapshotMemorySharing, error) {
451+
assert.Equal(t, rawPath, path)
452+
return snapshotMemorySharing{SharedBytes: 1024, PrivateBytes: 512}, nil
453+
}
454+
455+
target := compressionTarget{
456+
Key: "instance:shared-firecracker",
457+
OwnerID: "shared-firecracker",
458+
SnapshotDir: snapshotDir,
459+
HypervisorType: hypervisor.TypeFirecracker,
460+
Source: snapshotCompressionSourceStandby,
461+
Policy: snapshotstore.SnapshotCompressionConfig{
462+
Enabled: true,
463+
Algorithm: snapshotstore.SnapshotCompressionAlgorithmZstd,
464+
Level: intPtr(1),
465+
},
466+
}
467+
468+
mgr.startCompressionJob(context.Background(), target)
469+
470+
require.Eventually(t, func() bool {
471+
mgr.compressionMu.Lock()
472+
defer mgr.compressionMu.Unlock()
473+
_, ok := mgr.compressionJobs[target.Key]
474+
return !ok
475+
}, time.Second, 10*time.Millisecond)
476+
477+
_, err = os.Stat(rawPath)
478+
require.NoError(t, err, "shared raw memory should remain uncompressed")
479+
_, err = os.Stat(rawPath + ".zst")
480+
require.Error(t, err)
481+
assert.True(t, os.IsNotExist(err))
482+
483+
var rm metricdata.ResourceMetrics
484+
require.NoError(t, reader.Collect(t.Context(), &rm))
485+
jobsMetric := findMetric(t, rm, "hypeman_snapshot_compression_jobs_total")
486+
jobs, ok := jobsMetric.Data.(metricdata.Sum[int64])
487+
require.True(t, ok)
488+
require.Len(t, jobs.DataPoints, 1)
489+
assert.Equal(t, "skipped", metricLabel(t, jobs.DataPoints[0].Attributes, "result"))
490+
assert.Equal(t, "shared_extents", metricLabel(t, jobs.DataPoints[0].Attributes, "reason"))
491+
}
492+
493+
func TestStartCompressionJobDoesNotSkipNonFirecrackerSharedMemory(t *testing.T) {
494+
mgr := newSnapshotCompressionTestManager(t)
495+
496+
snapshotDir := t.TempDir()
497+
rawPath := filepath.Join(snapshotDir, "memory")
498+
require.NoError(t, os.WriteFile(rawPath, []byte("cloud hypervisor memory"), 0o644))
499+
500+
mgr.sharingInspector = func(string) (snapshotMemorySharing, error) {
501+
return snapshotMemorySharing{SharedBytes: 1024}, nil
502+
}
503+
504+
target := compressionTarget{
505+
Key: "instance:shared-cloud-hypervisor",
506+
OwnerID: "shared-cloud-hypervisor",
507+
SnapshotDir: snapshotDir,
508+
HypervisorType: hypervisor.TypeCloudHypervisor,
509+
Source: snapshotCompressionSourceStandby,
510+
Policy: snapshotstore.SnapshotCompressionConfig{
511+
Enabled: true,
512+
Algorithm: snapshotstore.SnapshotCompressionAlgorithmZstd,
513+
Level: intPtr(1),
514+
},
515+
}
516+
517+
mgr.startCompressionJob(context.Background(), target)
518+
519+
require.Eventually(t, func() bool {
520+
mgr.compressionMu.Lock()
521+
defer mgr.compressionMu.Unlock()
522+
_, ok := mgr.compressionJobs[target.Key]
523+
return !ok
524+
}, time.Second, 10*time.Millisecond)
525+
526+
_, err := os.Stat(rawPath)
527+
require.Error(t, err)
528+
assert.True(t, os.IsNotExist(err))
529+
_, _, compressed := findCompressedSnapshotMemoryFile(snapshotDir)
530+
assert.True(t, compressed)
531+
}
532+
436533
func TestStartCompressionJobDelayedCancellationRecordsSkipped(t *testing.T) {
437534
t.Parallel()
438535

0 commit comments

Comments
 (0)