Skip to content

Commit d9baa77

Browse files
committed
raftengine/etcd: purge old .snap files after snapshot persistence
The etcd Snapshotter.SaveSnap() writes new .snap files but never removes old ones. In upstream etcd this is handled by fileutil.PurgeFile in the etcd server layer, which elastickv does not use. Add purgeOldSnapFiles() that retains the most recent 3 .snap files and call it from both persistLocalSnapshotPayload and persistCreatedSnapshot. This prevents unbounded disk growth on all cluster nodes.
1 parent 84fed8d commit d9baa77

3 files changed

Lines changed: 116 additions & 4 deletions

File tree

internal/raftengine/etcd/engine.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"encoding/binary"
77
"log/slog"
8+
"path/filepath"
89
"sort"
910
"strconv"
1011
"sync"
@@ -1317,6 +1318,11 @@ func (e *Engine) persistCreatedSnapshot(snap raftpb.Snapshot) error {
13171318
if err := e.persist.Release(snap); err != nil {
13181319
return errors.WithStack(err)
13191320
}
1321+
1322+
snapDir := filepath.Join(e.dataDir, snapDirName)
1323+
if purgeErr := purgeOldSnapFiles(snapDir, defaultMaxSnapFiles); purgeErr != nil {
1324+
return errors.Wrap(purgeErr, "purge old snap files")
1325+
}
13201326
return nil
13211327
}
13221328

@@ -2262,16 +2268,18 @@ func (e *Engine) persistLocalSnapshotPayload(index uint64, payload []byte) error
22622268
_, err = persistLocalSnapshotPayload(e.storage, e.persist, index, payload)
22632269
switch {
22642270
case err == nil:
2265-
return nil
22662271
case errors.Is(err, etcdraft.ErrCompacted):
2267-
return nil
22682272
case errors.Is(err, etcdraft.ErrUnavailable):
2269-
return nil
22702273
case errors.Is(err, etcdraft.ErrSnapOutOfDate):
2271-
return nil
22722274
default:
22732275
return err
22742276
}
2277+
2278+
snapDir := filepath.Join(e.dataDir, snapDirName)
2279+
if purgeErr := purgeOldSnapFiles(snapDir, defaultMaxSnapFiles); purgeErr != nil {
2280+
return errors.Wrap(purgeErr, "purge old snap files")
2281+
}
2282+
return nil
22752283
}
22762284

22772285
func encodeReadContext(id uint64) []byte {

internal/raftengine/etcd/snapshot_spool_test.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package etcd
22

33
import (
4+
"fmt"
45
"os"
56
"path/filepath"
67
"testing"
@@ -42,3 +43,66 @@ func TestCleanupStaleSnapshotSpoolsEmptyDir(t *testing.T) {
4243
dir := t.TempDir()
4344
require.NoError(t, cleanupStaleSnapshotSpools(dir))
4445
}
46+
47+
// createSnapFile creates a fake .snap file with the etcd naming convention.
48+
func createSnapFile(t *testing.T, dir string, term, index uint64) string {
49+
t.Helper()
50+
name := fmt.Sprintf("%016x-%016x.snap", term, index)
51+
path := filepath.Join(dir, name)
52+
require.NoError(t, os.WriteFile(path, []byte("fake"), 0o644))
53+
return path
54+
}
55+
56+
func TestPurgeOldSnapFiles(t *testing.T) {
57+
dir := t.TempDir()
58+
59+
// Create 6 snap files at increasing indices.
60+
for i := uint64(1); i <= 6; i++ {
61+
createSnapFile(t, dir, 1, i*10000)
62+
}
63+
64+
// Create a non-snap file that must be preserved.
65+
other := filepath.Join(dir, "db.tmp.12345")
66+
require.NoError(t, os.WriteFile(other, []byte("x"), 0o644))
67+
68+
require.NoError(t, purgeOldSnapFiles(dir, defaultMaxSnapFiles))
69+
70+
entries, err := os.ReadDir(dir)
71+
require.NoError(t, err)
72+
73+
var snaps []string
74+
for _, e := range entries {
75+
if filepath.Ext(e.Name()) == ".snap" {
76+
snaps = append(snaps, e.Name())
77+
}
78+
}
79+
80+
// Only the newest 3 should remain.
81+
require.Len(t, snaps, 3)
82+
require.Equal(t, fmt.Sprintf("%016x-%016x.snap", 1, uint64(40000)), snaps[0])
83+
require.Equal(t, fmt.Sprintf("%016x-%016x.snap", 1, uint64(50000)), snaps[1])
84+
require.Equal(t, fmt.Sprintf("%016x-%016x.snap", 1, uint64(60000)), snaps[2])
85+
86+
// Non-snap file preserved.
87+
_, err = os.Stat(other)
88+
require.NoError(t, err)
89+
}
90+
91+
func TestPurgeOldSnapFilesUnderLimit(t *testing.T) {
92+
dir := t.TempDir()
93+
94+
// Only 2 files — under the limit of 3, nothing should be removed.
95+
createSnapFile(t, dir, 1, 1000)
96+
createSnapFile(t, dir, 1, 2000)
97+
98+
require.NoError(t, purgeOldSnapFiles(dir, defaultMaxSnapFiles))
99+
100+
entries, err := os.ReadDir(dir)
101+
require.NoError(t, err)
102+
require.Len(t, entries, 2)
103+
}
104+
105+
func TestPurgeOldSnapFilesEmptyDir(t *testing.T) {
106+
dir := t.TempDir()
107+
require.NoError(t, purgeOldSnapFiles(dir, defaultMaxSnapFiles))
108+
}

internal/raftengine/etcd/wal_store.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bytes"
55
"os"
66
"path/filepath"
7+
"sort"
78

89
"github.com/cockroachdb/errors"
910
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
@@ -340,6 +341,45 @@ func persistLocalSnapshotPayload(storage *etcdraft.MemoryStorage, persist etcdst
340341
return snapshot, nil
341342
}
342343

344+
// defaultMaxSnapFiles is the number of .snap files to retain in the snap
345+
// directory. etcd itself purges old snap files via fileutil.PurgeFile; the
346+
// elastickv etcd engine must do this explicitly.
347+
const defaultMaxSnapFiles = 3
348+
349+
// purgeOldSnapFiles removes old .snap files from snapDir, keeping the most
350+
// recent maxSnap files. Snap file names encode term and index in hex and sort
351+
// lexicographically from oldest to newest, matching etcd's Snapshotter
352+
// convention.
353+
func purgeOldSnapFiles(snapDir string, maxSnap int) error {
354+
entries, err := os.ReadDir(snapDir)
355+
if err != nil {
356+
return errors.WithStack(err)
357+
}
358+
359+
var snaps []string
360+
for _, e := range entries {
361+
if !e.IsDir() && filepath.Ext(e.Name()) == ".snap" {
362+
snaps = append(snaps, e.Name())
363+
}
364+
}
365+
366+
if len(snaps) <= maxSnap {
367+
return nil
368+
}
369+
370+
// Sort ascending (oldest first) — snap names are zero-padded hex so
371+
// lexicographic order equals chronological order.
372+
sort.Strings(snaps)
373+
374+
var combined error
375+
for _, name := range snaps[:len(snaps)-maxSnap] {
376+
if removeErr := os.Remove(filepath.Join(snapDir, name)); removeErr != nil && !os.IsNotExist(removeErr) {
377+
combined = errors.CombineErrors(combined, errors.WithStack(removeErr))
378+
}
379+
}
380+
return errors.WithStack(combined)
381+
}
382+
343383
func buildLocalSnapshot(storage *etcdraft.MemoryStorage, applied uint64, payload []byte) (raftpb.Snapshot, error) {
344384
_, confState, err := storage.InitialState()
345385
if err != nil {

0 commit comments

Comments
 (0)