Skip to content

Commit dd9e6eb

Browse files
authored
Merge branch 'main' into feat/read-skew-fix
2 parents 0577118 + c28fd40 commit dd9e6eb

11 files changed

Lines changed: 286 additions & 11 deletions

.github/workflows/claude-code-review.yml renamed to .github/workflows/claude-code-review.yml.disable2

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ jobs:
1414
id-token: write
1515
steps:
1616
- name: Checkout repository
17-
uses: actions/checkout@v4
17+
uses: actions/checkout@v6
1818
with:
1919
fetch-depth: 1
2020

@@ -49,7 +49,7 @@ jobs:
4949
id-token: write
5050
steps:
5151
- name: Checkout repository
52-
uses: actions/checkout@v4
52+
uses: actions/checkout@v6
5353
with:
5454
fetch-depth: 1
5555

@@ -85,7 +85,7 @@ jobs:
8585
id-token: write
8686
steps:
8787
- name: Checkout repository
88-
uses: actions/checkout@v4
88+
uses: actions/checkout@v6
8989
with:
9090
fetch-depth: 1
9191

@@ -121,7 +121,7 @@ jobs:
121121
id-token: write
122122
steps:
123123
- name: Checkout repository
124-
uses: actions/checkout@v4
124+
uses: actions/checkout@v6
125125
with:
126126
fetch-depth: 1
127127

@@ -157,7 +157,7 @@ jobs:
157157
id-token: write
158158
steps:
159159
- name: Checkout repository
160-
uses: actions/checkout@v4
160+
uses: actions/checkout@v6
161161
with:
162162
fetch-depth: 1
163163

.github/workflows/jepsen-test-scheduled.yml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,23 +6,23 @@ on:
66
time-limit:
77
description: "Workload runtime seconds"
88
required: false
9-
default: "150"
9+
default: "300"
1010
rate:
1111
description: "Ops/sec per worker"
1212
required: false
13-
default: "10"
13+
default: "5"
1414
concurrency:
1515
description: "Number of worker threads (must be multiple of 4 for S3)"
1616
required: false
17-
default: "8"
17+
default: "4"
1818
key-count:
1919
description: "Number of distinct keys per workload"
2020
required: false
21-
default: "16"
21+
default: "8"
2222
max-writes-per-key:
2323
description: "Maximum writes per key before exhaustion"
2424
required: false
25-
default: "250"
25+
default: "150"
2626

2727
concurrency:
2828
group: ${{ github.workflow }}-${{ github.ref }}-jepsen-scheduled

internal/raftengine/etcd/engine.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"encoding/binary"
77
"log/slog"
8+
"path/filepath"
89
"sort"
910
"strconv"
1011
"sync"
@@ -1317,6 +1318,11 @@ func (e *Engine) persistCreatedSnapshot(snap raftpb.Snapshot) error {
13171318
if err := e.persist.Release(snap); err != nil {
13181319
return errors.WithStack(err)
13191320
}
1321+
1322+
snapDir := filepath.Join(e.dataDir, snapDirName)
1323+
if purgeErr := purgeOldSnapFiles(snapDir); purgeErr != nil {
1324+
slog.Warn("failed to purge old snap files", "error", purgeErr)
1325+
}
13201326
return nil
13211327
}
13221328

@@ -2262,6 +2268,10 @@ func (e *Engine) persistLocalSnapshotPayload(index uint64, payload []byte) error
22622268
_, err = persistLocalSnapshotPayload(e.storage, e.persist, index, payload)
22632269
switch {
22642270
case err == nil:
2271+
snapDir := filepath.Join(e.dataDir, snapDirName)
2272+
if purgeErr := purgeOldSnapFiles(snapDir); purgeErr != nil {
2273+
slog.Warn("failed to purge old snap files", "error", purgeErr)
2274+
}
22652275
return nil
22662276
case errors.Is(err, etcdraft.ErrCompacted):
22672277
return nil

internal/raftengine/etcd/migrate.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,14 @@ func seedMigrationDir(tempDir string, peers []Peer, snapshotData []byte) error {
9696
if err := closePersist(disk.Persist); err != nil {
9797
return err
9898
}
99+
// Persist the peer list so the engine discovers all cluster members on
100+
// first open even when the caller's FactoryConfig.Peers is empty (the
101+
// common case during migration, where --raftBootstrapMembers is not
102+
// repeated). Without this file the engine falls back to a single-node
103+
// configuration and every node elects itself leader independently.
104+
if err := savePersistedPeers(tempDir, state.Snapshot.Metadata.Index, peers); err != nil {
105+
return err
106+
}
99107
return nil
100108
}
101109

internal/raftengine/etcd/migrate_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,3 +98,49 @@ func TestMigrateFSMStoreSeedsEtcdDataDir(t *testing.T) {
9898
require.NoError(t, err)
9999
require.Equal(t, []byte("one"), value)
100100
}
101+
102+
func TestMigrateFSMStorePersistsSingleNodePeer(t *testing.T) {
103+
sourcePath := filepath.Join(t.TempDir(), "fsm.db")
104+
source, err := store.NewPebbleStore(sourcePath)
105+
require.NoError(t, err)
106+
require.NoError(t, source.Close())
107+
108+
destDataDir := filepath.Join(t.TempDir(), "raft")
109+
peers := []Peer{{NodeID: 1, ID: "n1", Address: "127.0.0.1:7001"}}
110+
_, err = MigrateFSMStore(sourcePath, destDataDir, peers)
111+
require.NoError(t, err)
112+
113+
loaded, ok, err := LoadPersistedPeers(destDataDir)
114+
require.NoError(t, err)
115+
require.True(t, ok, "persisted peers file must exist after single-node migration")
116+
require.Len(t, loaded, 1)
117+
require.Equal(t, peers[0].NodeID, loaded[0].NodeID)
118+
}
119+
120+
func TestMigrateFSMStorePersistsMultiNodePeers(t *testing.T) {
121+
sourcePath := filepath.Join(t.TempDir(), "fsm.db")
122+
source, err := store.NewPebbleStore(sourcePath)
123+
require.NoError(t, err)
124+
require.NoError(t, source.Close())
125+
126+
destDataDir := filepath.Join(t.TempDir(), "raft")
127+
peers := []Peer{
128+
{NodeID: 1, ID: "n1", Address: "127.0.0.1:7001"},
129+
{NodeID: 2, ID: "n2", Address: "127.0.0.1:7002"},
130+
{NodeID: 3, ID: "n3", Address: "127.0.0.1:7003"},
131+
}
132+
_, err = MigrateFSMStore(sourcePath, destDataDir, peers)
133+
require.NoError(t, err)
134+
135+
// Persisted peers must exist so the engine discovers all cluster members
136+
// even when FactoryConfig.Peers is empty (the common post-migration case).
137+
loaded, ok, err := LoadPersistedPeers(destDataDir)
138+
require.NoError(t, err)
139+
require.True(t, ok, "persisted peers file must exist after migration")
140+
require.Len(t, loaded, 3)
141+
for i, peer := range loaded {
142+
require.Equal(t, peers[i].NodeID, peer.NodeID)
143+
require.Equal(t, peers[i].ID, peer.ID)
144+
require.Equal(t, peers[i].Address, peer.Address)
145+
}
146+
}

internal/raftengine/etcd/snapshot_spool.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package etcd
33
import (
44
"io"
55
"os"
6+
"path/filepath"
67

78
"github.com/cockroachdb/errors"
89
)
@@ -64,6 +65,24 @@ func (s *snapshotSpool) Reader() (io.Reader, error) {
6465
return s.file, nil
6566
}
6667

68+
// cleanupStaleSnapshotSpools removes orphaned snapshot spool files left behind
69+
// by a previous engine instance that crashed before Close could run.
70+
func cleanupStaleSnapshotSpools(dir string) error {
71+
matches, err := filepath.Glob(filepath.Join(dir, snapshotSpoolPattern))
72+
if err != nil {
73+
return errors.WithStack(err)
74+
}
75+
var combined error
76+
for _, match := range matches {
77+
removeErr := os.Remove(match)
78+
if removeErr == nil || os.IsNotExist(removeErr) {
79+
continue
80+
}
81+
combined = errors.CombineErrors(combined, errors.WithStack(removeErr))
82+
}
83+
return errors.WithStack(combined)
84+
}
85+
6786
func (s *snapshotSpool) Close() error {
6887
if s == nil {
6988
return nil
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package etcd
2+
3+
import (
4+
"fmt"
5+
"os"
6+
"path/filepath"
7+
"testing"
8+
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func TestCleanupStaleSnapshotSpools(t *testing.T) {
13+
dir := t.TempDir()
14+
15+
// Create several orphaned spool files matching the pattern.
16+
for i := 0; i < 5; i++ {
17+
f, err := os.CreateTemp(dir, snapshotSpoolPattern)
18+
require.NoError(t, err)
19+
require.NoError(t, f.Close())
20+
}
21+
22+
// Create an unrelated file that must not be removed.
23+
unrelated := filepath.Join(dir, "keep-me.txt")
24+
require.NoError(t, os.WriteFile(unrelated, []byte("data"), 0o600))
25+
26+
matches, err := filepath.Glob(filepath.Join(dir, snapshotSpoolPattern))
27+
require.NoError(t, err)
28+
require.Len(t, matches, 5)
29+
30+
require.NoError(t, cleanupStaleSnapshotSpools(dir))
31+
32+
// All spool files should be gone.
33+
matches, err = filepath.Glob(filepath.Join(dir, snapshotSpoolPattern))
34+
require.NoError(t, err)
35+
require.Empty(t, matches)
36+
37+
// Unrelated file should still exist.
38+
_, err = os.Stat(unrelated)
39+
require.NoError(t, err)
40+
}
41+
42+
func TestCleanupStaleSnapshotSpoolsEmptyDir(t *testing.T) {
43+
dir := t.TempDir()
44+
require.NoError(t, cleanupStaleSnapshotSpools(dir))
45+
}
46+
47+
func TestCleanupStaleSnapshotSpoolsNonExistentDir(t *testing.T) {
48+
require.NoError(t, cleanupStaleSnapshotSpools(filepath.Join(t.TempDir(), "no-such-dir")))
49+
}
50+
51+
// createSnapFile creates a fake .snap file with the etcd naming convention.
52+
func createSnapFile(t *testing.T, dir string, term, index uint64) {
53+
t.Helper()
54+
name := fmt.Sprintf("%016x-%016x.snap", term, index)
55+
path := filepath.Join(dir, name)
56+
require.NoError(t, os.WriteFile(path, []byte("fake"), 0o600))
57+
}
58+
59+
func TestPurgeOldSnapFiles(t *testing.T) {
60+
dir := t.TempDir()
61+
62+
// Create 6 snap files at increasing indices.
63+
for i := uint64(1); i <= 6; i++ {
64+
createSnapFile(t, dir, 1, i*10000)
65+
}
66+
67+
// Create a non-snap file that must be preserved.
68+
other := filepath.Join(dir, "db.tmp.12345")
69+
require.NoError(t, os.WriteFile(other, []byte("x"), 0o600))
70+
71+
require.NoError(t, purgeOldSnapFiles(dir))
72+
73+
entries, err := os.ReadDir(dir)
74+
require.NoError(t, err)
75+
76+
var snaps []string
77+
for _, e := range entries {
78+
if filepath.Ext(e.Name()) == ".snap" {
79+
snaps = append(snaps, e.Name())
80+
}
81+
}
82+
83+
// Only the newest 3 should remain.
84+
require.Len(t, snaps, 3)
85+
require.Equal(t, fmt.Sprintf("%016x-%016x.snap", 1, uint64(40000)), snaps[0])
86+
require.Equal(t, fmt.Sprintf("%016x-%016x.snap", 1, uint64(50000)), snaps[1])
87+
require.Equal(t, fmt.Sprintf("%016x-%016x.snap", 1, uint64(60000)), snaps[2])
88+
89+
// Non-snap file preserved.
90+
_, err = os.Stat(other)
91+
require.NoError(t, err)
92+
}
93+
94+
func TestPurgeOldSnapFilesUnderLimit(t *testing.T) {
95+
dir := t.TempDir()
96+
97+
// Only 2 files — under the limit of 3, nothing should be removed.
98+
createSnapFile(t, dir, 1, 1000)
99+
createSnapFile(t, dir, 1, 2000)
100+
101+
require.NoError(t, purgeOldSnapFiles(dir))
102+
103+
entries, err := os.ReadDir(dir)
104+
require.NoError(t, err)
105+
require.Len(t, entries, 2)
106+
}
107+
108+
func TestPurgeOldSnapFilesEmptyDir(t *testing.T) {
109+
dir := t.TempDir()
110+
require.NoError(t, purgeOldSnapFiles(dir))
111+
}

internal/raftengine/etcd/wal_store.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ func openDiskState(cfg OpenConfig, peers []Peer) (*diskState, error) {
3838
return nil, errors.WithStack(err)
3939
}
4040

41+
if err := cleanupStaleSnapshotSpools(cfg.DataDir); err != nil {
42+
return nil, errors.Wrap(err, "cleanup stale snapshot spools")
43+
}
44+
4145
if wal.Exist(walDir) {
4246
return loadWalState(logger, walDir, snapDir, cfg.StateMachine)
4347
}
@@ -336,6 +340,45 @@ func persistLocalSnapshotPayload(storage *etcdraft.MemoryStorage, persist etcdst
336340
return snapshot, nil
337341
}
338342

343+
// defaultMaxSnapFiles is the number of .snap files to retain in the snap
344+
// directory. etcd itself purges old snap files via fileutil.PurgeFile; the
345+
// elastickv etcd engine must do this explicitly.
346+
const defaultMaxSnapFiles = 3
347+
348+
// purgeOldSnapFiles removes old .snap files from snapDir, keeping the most
349+
// recent defaultMaxSnapFiles files. Snap file names encode term and index in
350+
// hex and sort lexicographically from oldest to newest, matching etcd's
351+
// Snapshotter convention.
352+
func purgeOldSnapFiles(snapDir string) error {
353+
entries, err := os.ReadDir(snapDir)
354+
if err != nil {
355+
return errors.WithStack(err)
356+
}
357+
358+
var snaps []string
359+
for _, e := range entries {
360+
if !e.IsDir() && filepath.Ext(e.Name()) == ".snap" {
361+
snaps = append(snaps, e.Name())
362+
}
363+
}
364+
365+
if len(snaps) <= defaultMaxSnapFiles {
366+
return nil
367+
}
368+
369+
// snaps is already sorted ascending (oldest first) because os.ReadDir
370+
// returns entries in directory order which, for zero-padded hex names,
371+
// equals chronological order.
372+
373+
var combined error
374+
for _, name := range snaps[:len(snaps)-defaultMaxSnapFiles] {
375+
if removeErr := os.Remove(filepath.Join(snapDir, name)); removeErr != nil && !os.IsNotExist(removeErr) {
376+
combined = errors.CombineErrors(combined, errors.WithStack(removeErr))
377+
}
378+
}
379+
return errors.WithStack(combined)
380+
}
381+
339382
func buildLocalSnapshot(storage *etcdraft.MemoryStorage, applied uint64, payload []byte) (raftpb.Snapshot, error) {
340383
_, confState, err := storage.InitialState()
341384
if err != nil {

multiraft_runtime.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ func detectRaftEngineFromDataDir(dir string) (raftEngineType, bool, error) {
140140
return "", false, err
141141
}
142142
etcdArtifacts, err := hasRaftArtifacts(dir,
143+
"wal",
144+
"snap",
143145
filepath.Join("member", "wal"),
144146
filepath.Join("member", "snap"),
145147
"etcd-raft-state.bin",

0 commit comments

Comments
 (0)