Skip to content

Commit af84810

Browse files
committed
raftengine/etcd: address second review round
- Fix data-loss risk: purge only runs on successful snapshot persistence (err == nil), not on ErrUnavailable/ErrCompacted - Make purge best-effort: log warning instead of returning error, so purge failure does not block snapshot progression - Fix unparam: remove unused maxSnap parameter from purgeOldSnapFiles, use defaultMaxSnapFiles constant directly - Fix unparam: remove unused return value from createSnapFile test helper
1 parent 89c6ebf commit af84810

3 files changed

Lines changed: 20 additions & 19 deletions

File tree

internal/raftengine/etcd/engine.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1320,8 +1320,8 @@ func (e *Engine) persistCreatedSnapshot(snap raftpb.Snapshot) error {
13201320
}
13211321

13221322
snapDir := filepath.Join(e.dataDir, snapDirName)
1323-
if purgeErr := purgeOldSnapFiles(snapDir, defaultMaxSnapFiles); purgeErr != nil {
1324-
return errors.Wrap(purgeErr, "purge old snap files")
1323+
if purgeErr := purgeOldSnapFiles(snapDir); purgeErr != nil {
1324+
slog.Warn("failed to purge old snap files", "error", purgeErr)
13251325
}
13261326
return nil
13271327
}
@@ -2268,18 +2268,20 @@ func (e *Engine) persistLocalSnapshotPayload(index uint64, payload []byte) error
22682268
_, err = persistLocalSnapshotPayload(e.storage, e.persist, index, payload)
22692269
switch {
22702270
case err == nil:
2271+
snapDir := filepath.Join(e.dataDir, snapDirName)
2272+
if purgeErr := purgeOldSnapFiles(snapDir); purgeErr != nil {
2273+
slog.Warn("failed to purge old snap files", "error", purgeErr)
2274+
}
2275+
return nil
22712276
case errors.Is(err, etcdraft.ErrCompacted):
2277+
return nil
22722278
case errors.Is(err, etcdraft.ErrUnavailable):
2279+
return nil
22732280
case errors.Is(err, etcdraft.ErrSnapOutOfDate):
2281+
return nil
22742282
default:
22752283
return err
22762284
}
2277-
2278-
snapDir := filepath.Join(e.dataDir, snapDirName)
2279-
if purgeErr := purgeOldSnapFiles(snapDir, defaultMaxSnapFiles); purgeErr != nil {
2280-
return errors.Wrap(purgeErr, "purge old snap files")
2281-
}
2282-
return nil
22832285
}
22842286

22852287
func encodeReadContext(id uint64) []byte {

internal/raftengine/etcd/snapshot_spool_test.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,11 @@ func TestCleanupStaleSnapshotSpoolsNonExistentDir(t *testing.T) {
4949
}
5050

5151
// createSnapFile creates a fake .snap file with the etcd naming convention.
52-
func createSnapFile(t *testing.T, dir string, term, index uint64) string {
52+
func createSnapFile(t *testing.T, dir string, term, index uint64) {
5353
t.Helper()
5454
name := fmt.Sprintf("%016x-%016x.snap", term, index)
5555
path := filepath.Join(dir, name)
5656
require.NoError(t, os.WriteFile(path, []byte("fake"), 0o600))
57-
return path
5857
}
5958

6059
func TestPurgeOldSnapFiles(t *testing.T) {
@@ -69,7 +68,7 @@ func TestPurgeOldSnapFiles(t *testing.T) {
6968
other := filepath.Join(dir, "db.tmp.12345")
7069
require.NoError(t, os.WriteFile(other, []byte("x"), 0o600))
7170

72-
require.NoError(t, purgeOldSnapFiles(dir, defaultMaxSnapFiles))
71+
require.NoError(t, purgeOldSnapFiles(dir))
7372

7473
entries, err := os.ReadDir(dir)
7574
require.NoError(t, err)
@@ -99,7 +98,7 @@ func TestPurgeOldSnapFilesUnderLimit(t *testing.T) {
9998
createSnapFile(t, dir, 1, 1000)
10099
createSnapFile(t, dir, 1, 2000)
101100

102-
require.NoError(t, purgeOldSnapFiles(dir, defaultMaxSnapFiles))
101+
require.NoError(t, purgeOldSnapFiles(dir))
103102

104103
entries, err := os.ReadDir(dir)
105104
require.NoError(t, err)
@@ -108,5 +107,5 @@ func TestPurgeOldSnapFilesUnderLimit(t *testing.T) {
108107

109108
func TestPurgeOldSnapFilesEmptyDir(t *testing.T) {
110109
dir := t.TempDir()
111-
require.NoError(t, purgeOldSnapFiles(dir, defaultMaxSnapFiles))
110+
require.NoError(t, purgeOldSnapFiles(dir))
112111
}

internal/raftengine/etcd/wal_store.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -347,10 +347,10 @@ func persistLocalSnapshotPayload(storage *etcdraft.MemoryStorage, persist etcdst
347347
const defaultMaxSnapFiles = 3
348348

349349
// purgeOldSnapFiles removes old .snap files from snapDir, keeping the most
350-
// recent maxSnap files. Snap file names encode term and index in hex and sort
351-
// lexicographically from oldest to newest, matching etcd's Snapshotter
352-
// convention.
353-
func purgeOldSnapFiles(snapDir string, maxSnap int) error {
350+
// recent defaultMaxSnapFiles files. Snap file names encode term and index in
351+
// hex and sort lexicographically from oldest to newest, matching etcd's
352+
// Snapshotter convention.
353+
func purgeOldSnapFiles(snapDir string) error {
354354
entries, err := os.ReadDir(snapDir)
355355
if err != nil {
356356
return errors.WithStack(err)
@@ -363,7 +363,7 @@ func purgeOldSnapFiles(snapDir string, maxSnap int) error {
363363
}
364364
}
365365

366-
if len(snaps) <= maxSnap {
366+
if len(snaps) <= defaultMaxSnapFiles {
367367
return nil
368368
}
369369

@@ -372,7 +372,7 @@ func purgeOldSnapFiles(snapDir string, maxSnap int) error {
372372
sort.Strings(snaps)
373373

374374
var combined error
375-
for _, name := range snaps[:len(snaps)-maxSnap] {
375+
for _, name := range snaps[:len(snaps)-defaultMaxSnapFiles] {
376376
if removeErr := os.Remove(filepath.Join(snapDir, name)); removeErr != nil && !os.IsNotExist(removeErr) {
377377
combined = errors.CombineErrors(combined, errors.WithStack(removeErr))
378378
}

0 commit comments

Comments
 (0)