Skip to content

Commit 37ae52e

Browse files
authored
Merge pull request #498 from bootjp/feat/fix-spapshot-cleanup
raftengine/etcd: clean up orphaned snapshot spool files on startup
2 parents bf3f601 + cb2e0a1 commit 37ae52e

4 files changed

Lines changed: 183 additions & 0 deletions

File tree

internal/raftengine/etcd/engine.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"encoding/binary"
77
"log/slog"
8+
"path/filepath"
89
"sort"
910
"strconv"
1011
"sync"
@@ -1317,6 +1318,11 @@ func (e *Engine) persistCreatedSnapshot(snap raftpb.Snapshot) error {
13171318
if err := e.persist.Release(snap); err != nil {
13181319
return errors.WithStack(err)
13191320
}
1321+
1322+
snapDir := filepath.Join(e.dataDir, snapDirName)
1323+
if purgeErr := purgeOldSnapFiles(snapDir); purgeErr != nil {
1324+
slog.Warn("failed to purge old snap files", "error", purgeErr)
1325+
}
13201326
return nil
13211327
}
13221328

@@ -2262,6 +2268,10 @@ func (e *Engine) persistLocalSnapshotPayload(index uint64, payload []byte) error
22622268
_, err = persistLocalSnapshotPayload(e.storage, e.persist, index, payload)
22632269
switch {
22642270
case err == nil:
2271+
snapDir := filepath.Join(e.dataDir, snapDirName)
2272+
if purgeErr := purgeOldSnapFiles(snapDir); purgeErr != nil {
2273+
slog.Warn("failed to purge old snap files", "error", purgeErr)
2274+
}
22652275
return nil
22662276
case errors.Is(err, etcdraft.ErrCompacted):
22672277
return nil

internal/raftengine/etcd/snapshot_spool.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package etcd
33
import (
44
"io"
55
"os"
6+
"path/filepath"
67

78
"github.com/cockroachdb/errors"
89
)
@@ -64,6 +65,24 @@ func (s *snapshotSpool) Reader() (io.Reader, error) {
6465
return s.file, nil
6566
}
6667

68+
// cleanupStaleSnapshotSpools removes orphaned snapshot spool files left behind
69+
// by a previous engine instance that crashed before Close could run.
70+
func cleanupStaleSnapshotSpools(dir string) error {
71+
matches, err := filepath.Glob(filepath.Join(dir, snapshotSpoolPattern))
72+
if err != nil {
73+
return errors.WithStack(err)
74+
}
75+
var combined error
76+
for _, match := range matches {
77+
removeErr := os.Remove(match)
78+
if removeErr == nil || os.IsNotExist(removeErr) {
79+
continue
80+
}
81+
combined = errors.CombineErrors(combined, errors.WithStack(removeErr))
82+
}
83+
return errors.WithStack(combined)
84+
}
85+
6786
func (s *snapshotSpool) Close() error {
6887
if s == nil {
6988
return nil
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package etcd
2+
3+
import (
4+
"fmt"
5+
"os"
6+
"path/filepath"
7+
"testing"
8+
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func TestCleanupStaleSnapshotSpools(t *testing.T) {
13+
dir := t.TempDir()
14+
15+
// Create several orphaned spool files matching the pattern.
16+
for i := 0; i < 5; i++ {
17+
f, err := os.CreateTemp(dir, snapshotSpoolPattern)
18+
require.NoError(t, err)
19+
require.NoError(t, f.Close())
20+
}
21+
22+
// Create an unrelated file that must not be removed.
23+
unrelated := filepath.Join(dir, "keep-me.txt")
24+
require.NoError(t, os.WriteFile(unrelated, []byte("data"), 0o600))
25+
26+
matches, err := filepath.Glob(filepath.Join(dir, snapshotSpoolPattern))
27+
require.NoError(t, err)
28+
require.Len(t, matches, 5)
29+
30+
require.NoError(t, cleanupStaleSnapshotSpools(dir))
31+
32+
// All spool files should be gone.
33+
matches, err = filepath.Glob(filepath.Join(dir, snapshotSpoolPattern))
34+
require.NoError(t, err)
35+
require.Empty(t, matches)
36+
37+
// Unrelated file should still exist.
38+
_, err = os.Stat(unrelated)
39+
require.NoError(t, err)
40+
}
41+
42+
func TestCleanupStaleSnapshotSpoolsEmptyDir(t *testing.T) {
43+
dir := t.TempDir()
44+
require.NoError(t, cleanupStaleSnapshotSpools(dir))
45+
}
46+
47+
func TestCleanupStaleSnapshotSpoolsNonExistentDir(t *testing.T) {
48+
require.NoError(t, cleanupStaleSnapshotSpools(filepath.Join(t.TempDir(), "no-such-dir")))
49+
}
50+
51+
// createSnapFile creates a fake .snap file with the etcd naming convention.
52+
func createSnapFile(t *testing.T, dir string, term, index uint64) {
53+
t.Helper()
54+
name := fmt.Sprintf("%016x-%016x.snap", term, index)
55+
path := filepath.Join(dir, name)
56+
require.NoError(t, os.WriteFile(path, []byte("fake"), 0o600))
57+
}
58+
59+
func TestPurgeOldSnapFiles(t *testing.T) {
60+
dir := t.TempDir()
61+
62+
// Create 6 snap files at increasing indices.
63+
for i := uint64(1); i <= 6; i++ {
64+
createSnapFile(t, dir, 1, i*10000)
65+
}
66+
67+
// Create a non-snap file that must be preserved.
68+
other := filepath.Join(dir, "db.tmp.12345")
69+
require.NoError(t, os.WriteFile(other, []byte("x"), 0o600))
70+
71+
require.NoError(t, purgeOldSnapFiles(dir))
72+
73+
entries, err := os.ReadDir(dir)
74+
require.NoError(t, err)
75+
76+
var snaps []string
77+
for _, e := range entries {
78+
if filepath.Ext(e.Name()) == ".snap" {
79+
snaps = append(snaps, e.Name())
80+
}
81+
}
82+
83+
// Only the newest 3 should remain.
84+
require.Len(t, snaps, 3)
85+
require.Equal(t, fmt.Sprintf("%016x-%016x.snap", 1, uint64(40000)), snaps[0])
86+
require.Equal(t, fmt.Sprintf("%016x-%016x.snap", 1, uint64(50000)), snaps[1])
87+
require.Equal(t, fmt.Sprintf("%016x-%016x.snap", 1, uint64(60000)), snaps[2])
88+
89+
// Non-snap file preserved.
90+
_, err = os.Stat(other)
91+
require.NoError(t, err)
92+
}
93+
94+
func TestPurgeOldSnapFilesUnderLimit(t *testing.T) {
95+
dir := t.TempDir()
96+
97+
// Only 2 files — under the limit of 3, nothing should be removed.
98+
createSnapFile(t, dir, 1, 1000)
99+
createSnapFile(t, dir, 1, 2000)
100+
101+
require.NoError(t, purgeOldSnapFiles(dir))
102+
103+
entries, err := os.ReadDir(dir)
104+
require.NoError(t, err)
105+
require.Len(t, entries, 2)
106+
}
107+
108+
func TestPurgeOldSnapFilesEmptyDir(t *testing.T) {
109+
dir := t.TempDir()
110+
require.NoError(t, purgeOldSnapFiles(dir))
111+
}

internal/raftengine/etcd/wal_store.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ func openDiskState(cfg OpenConfig, peers []Peer) (*diskState, error) {
3838
return nil, errors.WithStack(err)
3939
}
4040

41+
if err := cleanupStaleSnapshotSpools(cfg.DataDir); err != nil {
42+
return nil, errors.Wrap(err, "cleanup stale snapshot spools")
43+
}
44+
4145
if wal.Exist(walDir) {
4246
return loadWalState(logger, walDir, snapDir, cfg.StateMachine)
4347
}
@@ -336,6 +340,45 @@ func persistLocalSnapshotPayload(storage *etcdraft.MemoryStorage, persist etcdst
336340
return snapshot, nil
337341
}
338342

343+
// defaultMaxSnapFiles is the number of .snap files to retain in the snap
344+
// directory. etcd itself purges old snap files via fileutil.PurgeFile; the
345+
// elastickv etcd engine must do this explicitly.
346+
const defaultMaxSnapFiles = 3
347+
348+
// purgeOldSnapFiles removes old .snap files from snapDir, keeping the most
349+
// recent defaultMaxSnapFiles files. Snap file names encode term and index in
350+
// hex and sort lexicographically from oldest to newest, matching etcd's
351+
// Snapshotter convention.
352+
func purgeOldSnapFiles(snapDir string) error {
353+
entries, err := os.ReadDir(snapDir)
354+
if err != nil {
355+
return errors.WithStack(err)
356+
}
357+
358+
var snaps []string
359+
for _, e := range entries {
360+
if !e.IsDir() && filepath.Ext(e.Name()) == ".snap" {
361+
snaps = append(snaps, e.Name())
362+
}
363+
}
364+
365+
if len(snaps) <= defaultMaxSnapFiles {
366+
return nil
367+
}
368+
369+
// snaps is already sorted ascending (oldest first) because os.ReadDir
370+
// returns entries in directory order which, for zero-padded hex names,
371+
// equals chronological order.
372+
373+
var combined error
374+
for _, name := range snaps[:len(snaps)-defaultMaxSnapFiles] {
375+
if removeErr := os.Remove(filepath.Join(snapDir, name)); removeErr != nil && !os.IsNotExist(removeErr) {
376+
combined = errors.CombineErrors(combined, errors.WithStack(removeErr))
377+
}
378+
}
379+
return errors.WithStack(combined)
380+
}
381+
339382
func buildLocalSnapshot(storage *etcdraft.MemoryStorage, applied uint64, payload []byte) (raftpb.Snapshot, error) {
340383
_, confState, err := storage.InitialState()
341384
if err != nil {

0 commit comments

Comments
 (0)