Skip to content

Commit c3f0ce6

Browse files
committed
feat(keyviz): address PR #720 round-2 (Codex P2 ×2)
Two P2 items from Codex's second pass on `d7e5c43a`: - Codex P2 #1 (keyviz_fanout.go) — fail-closed semantic for legacy peer winners. The previous code used a `j < len(row.RaftGroupIDs)` guard that SKIPPED the assignment when the incoming source had empty arrays (legacy peer in mixed-version cluster). That left the stale identity from a previous higher-versioned source in `dst.RaftGroupIDs[idx]`, mislabeling an unknown-term winning cell as a known term and breaking the per-cell dedupe/summing in PR-3c. Fixed by EXPLICITLY RESETTING to 0 (the documented "term not tracked" sentinel) when the incoming wins but its metadata array is shorter than j. Caller audit: only mergeKeyVizMatrices calls mergeRowInto; no external consumer relies on the stale-identity behaviour. - Codex P2 #2 (main.go publishLeaderTerms) — race between the publisher's read of rt.engine and rt.Close()'s write. Even after round-1's local-snapshot fix, the initial read remained unsynchronized with Close()'s write. Fixed structurally by adding engineMu sync.RWMutex to raftGroupRuntime, exposing snapshotEngine() that takes the read lock, and updating Close() to take the write lock around clearing the field. Caller audit (per loop instruction — semantic change to field access pattern): grepped all `rt.engine` / `r.engine` reads: - main.go:858, 1104, 1108, 1115 — startup wiring, single-threaded BEFORE any goroutine launches; safe to read directly. - main_admin.go:828, 831, 837 (newClusterInfoSource) — HTTP/gRPC handler closure, runs concurrent with rt.Close(). SAME race class as the publisher. Migrated to snapshotEngine() for consistency. - multiraft_runtime.go:Close() — now takes engineMu write lock. No other readers exist outside startup. Test: TestMergeKeyVizMatricesLegacyPeerWinnerResetsIdentity drives a mixed-version cluster scenario (modern source reports 30 with (group=7, term=42); legacy source reports 50 with empty arrays). Legacy wins maxMerge with value=50; asserts dst.RaftGroupIDs / LeaderTerms reset to 0 instead of inheriting modern's identity. go test -race ./keyviz/... ./internal/admin/... — pass. golangci-lint run (full project) — 0 issues.
1 parent d7e5c43 commit c3f0ce6

5 files changed

Lines changed: 111 additions & 26 deletions

File tree

internal/admin/keyviz_fanout.go

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -556,11 +556,27 @@ func mergeRowInto(
556556
// when the incoming source won the cell; in the tied
557557
// (prev == incoming != 0) case `next == prev`, both sides
558558
// agree on the value, and either identity is acceptable.
559-
if next == row.Values[j] && j < len(row.RaftGroupIDs) {
560-
dst.RaftGroupIDs[idx] = row.RaftGroupIDs[j]
561-
}
562-
if next == row.Values[j] && j < len(row.LeaderTerms) {
563-
dst.LeaderTerms[idx] = row.LeaderTerms[j]
559+
if next == row.Values[j] {
560+
// Mixed-version cluster: a legacy peer that does not
561+
// emit raft_group_ids / leader_terms sends empty
562+
// slices. When such a peer's value wins the cell, we
563+
// must EXPLICITLY RESET dst.RaftGroupIDs[idx] /
564+
// dst.LeaderTerms[idx] to 0 (the documented "term not
565+
// tracked" sentinel) — leaving stale identity from a
566+
// previous higher-versioned source would mislabel an
567+
// unknown-term winning cell as a known term and break
568+
// the per-cell dedupe/summing in PR-3c. (Codex P2
569+
// round-2 on PR #720.)
570+
if j < len(row.RaftGroupIDs) {
571+
dst.RaftGroupIDs[idx] = row.RaftGroupIDs[j]
572+
} else {
573+
dst.RaftGroupIDs[idx] = 0
574+
}
575+
if j < len(row.LeaderTerms) {
576+
dst.LeaderTerms[idx] = row.LeaderTerms[j]
577+
} else {
578+
dst.LeaderTerms[idx] = 0
579+
}
564580
}
565581
}
566582
}

internal/admin/keyviz_fanout_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,44 @@ func TestMergeKeyVizMatricesPreservesRaftIdentity(t *testing.T) {
103103
require.Equal(t, []uint64{42}, merged.Rows[0].LeaderTerms, "LeaderTerms must survive mergeRowInto")
104104
}
105105

106+
// TestMergeKeyVizMatricesLegacyPeerWinnerResetsIdentity pins the
107+
// fail-closed semantic for mixed-version clusters (Codex P2
108+
// round-2): when a legacy peer that does not emit raft_group_ids /
109+
// leader_terms (empty slices) wins a cell, dst.RaftGroupIDs[idx]
110+
// and dst.LeaderTerms[idx] must be RESET to 0 (the documented
111+
// "term not tracked" sentinel) — leaving stale identity from a
112+
// previous higher-versioned source would mislabel an unknown-term
113+
// winning cell as a known term and break the per-cell
114+
// dedupe/summing in PR-3c.
115+
func TestMergeKeyVizMatricesLegacyPeerWinnerResetsIdentity(t *testing.T) {
116+
t.Parallel()
117+
col := []int64{1_700_000_000_000}
118+
// Source 1 (modern): reports 30 with identity (group=7, term=42).
119+
modern := KeyVizMatrix{
120+
ColumnUnixMs: col,
121+
Series: keyVizSeriesWrites,
122+
Rows: []KeyVizRow{
123+
{BucketID: "route:5", Values: []uint64{30}, RaftGroupIDs: []uint64{7}, LeaderTerms: []uint64{42}},
124+
},
125+
}
126+
// Source 2 (legacy): reports 50 with NO arrays (older server
127+
// build). Wins maxMerge but cannot identify its term.
128+
legacy := KeyVizMatrix{
129+
ColumnUnixMs: col,
130+
Series: keyVizSeriesWrites,
131+
Rows: []KeyVizRow{
132+
{BucketID: "route:5", Values: []uint64{50}},
133+
},
134+
}
135+
merged := mergeKeyVizMatrices([]KeyVizMatrix{modern, legacy}, keyVizSeriesWrites)
136+
require.Len(t, merged.Rows, 1)
137+
require.Equal(t, []uint64{50}, merged.Rows[0].Values, "legacy peer's value wins maxMerge")
138+
require.Equal(t, []uint64{0}, merged.Rows[0].RaftGroupIDs,
139+
"legacy winner without metadata must reset dst identity to 0 sentinel — not inherit modern's 7")
140+
require.Equal(t, []uint64{0}, merged.Rows[0].LeaderTerms,
141+
"legacy winner without metadata must reset dst term to 0 sentinel — not inherit modern's 42")
142+
}
143+
106144
// TestMergeKeyVizMatricesPerCellIdentityMatchesValueOwner pins the
107145
// Gemini MEDIUM fix: when maxMerge picks a value from one source,
108146
// the identity at that cell must come from the SAME source — not

main.go

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1559,18 +1559,15 @@ type groupTermSnapshot struct {
15591559
func publishLeaderTerms(s *keyviz.MemSampler, runtimes []*raftGroupRuntime) {
15601560
snaps := make([]groupTermSnapshot, 0, len(runtimes))
15611561
for _, rt := range runtimes {
1562-
if rt == nil {
1563-
continue
1564-
}
1565-
// Snapshot rt.engine into a local once so a concurrent
1566-
// rt.Close() (which sets rt.engine = nil) cannot race the
1567-
// nil-check and the Status() call. On startup-error paths
1568-
// that return before joining all goroutines (e.g.
1569-
// setupAdminService failing), this goroutine can otherwise
1570-
// observe rt.engine going nil between the two reads and
1571-
// panic / trigger a race-detector failure. (Codex P2 on
1572-
// PR #720.)
1573-
engine := rt.engine
1562+
// snapshotEngine takes engineMu.RLock so a concurrent
1563+
// rt.Close() (which clears rt.engine while holding the
1564+
// write lock) cannot race the publisher's read. On
1565+
// startup-error paths that fire cleanup before
1566+
// joining all goroutines, this lock prevents the
1567+
// race-detector failure and the undefined-behavior
1568+
// nil-pointer dereference Codex round-1/round-2 P2
1569+
// flagged on PR #720.
1570+
engine := rt.snapshotEngine()
15741571
if engine == nil {
15751572
continue
15761573
}

main_admin.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -825,16 +825,22 @@ func newClusterInfoSource(nodeID, version string, runtimes []*raftGroupRuntime)
825825
return admin.ClusterInfoFunc(func(ctx context.Context) (admin.ClusterInfo, error) {
826826
groups := make([]admin.GroupInfo, 0, len(runtimes))
827827
for _, rt := range runtimes {
828-
if rt == nil || rt.engine == nil {
828+
// snapshotEngine takes engineMu.RLock so this
829+
// HTTP/gRPC handler running on its own goroutine
830+
// cannot race rt.Close() clearing the field on
831+
// shutdown. Same race class as the keyviz
832+
// leader-term publisher (Codex P2 on PR #720).
833+
engine := rt.snapshotEngine()
834+
if engine == nil {
829835
continue
830836
}
831-
status := rt.engine.Status()
837+
status := engine.Status()
832838
// Seed as an empty-but-non-nil slice so a
833839
// Configuration() failure still JSON-encodes as `[]`
834840
// rather than `null`; API consumers that treat
835841
// members as an always-array field rely on this.
836842
members := []string{}
837-
if cfg, err := rt.engine.Configuration(ctx); err == nil {
843+
if cfg, err := engine.Configuration(ctx); err == nil {
838844
members = make([]string, 0, len(cfg.Servers))
839845
for _, srv := range cfg.Servers {
840846
members = append(members, srv.ID)

multiraft_runtime.go

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"os"
77
"path/filepath"
88
"strings"
9+
"sync"
910

1011
"github.com/bootjp/elastickv/internal/raftengine"
1112
"github.com/bootjp/elastickv/store"
@@ -14,14 +15,38 @@ import (
1415
)
1516

1617
type raftGroupRuntime struct {
17-
spec groupSpec
18-
engine raftengine.Engine
19-
store store.MVCCStore
18+
spec groupSpec
19+
// engineMu guards engine to allow concurrent readers
20+
// (specifically the keyviz leader-term publisher goroutine that
21+
// runs on a ticker independent of startup ordering) to coexist
22+
// with Close() which clears the field on shutdown. Direct field
23+
// reads from the startup path remain safe because they execute
24+
// single-threaded before any goroutine is launched; long-lived
25+
// goroutines that read after startup must go through
26+
// snapshotEngine() (Codex P2 on PR #720).
27+
engineMu sync.RWMutex
28+
engine raftengine.Engine
29+
30+
store store.MVCCStore
2031

2132
registerTransport func(grpc.ServiceRegistrar)
2233
closeFactory func() error // releases factory-created resources (transport, stores)
2334
}
2435

36+
// snapshotEngine returns the current engine reference under engineMu.
37+
// Long-lived goroutines that may run while Close() executes (the
38+
// keyviz leader-term publisher; future racy readers as they're
39+
// migrated) MUST go through this accessor instead of reading the
40+
// engine field directly. nil-receiver-safe.
41+
func (r *raftGroupRuntime) snapshotEngine() raftengine.Engine {
42+
if r == nil {
43+
return nil
44+
}
45+
r.engineMu.RLock()
46+
defer r.engineMu.RUnlock()
47+
return r.engine
48+
}
49+
2550
const raftEngineMarkerPerm = 0o600
2651

2752
type raftEngineType string
@@ -50,11 +75,14 @@ func (r *raftGroupRuntime) Close() {
5075
if r == nil {
5176
return
5277
}
53-
if r.engine != nil {
54-
if err := r.engine.Close(); err != nil {
78+
r.engineMu.Lock()
79+
engine := r.engine
80+
r.engine = nil
81+
r.engineMu.Unlock()
82+
if engine != nil {
83+
if err := engine.Close(); err != nil {
5584
slog.Warn("failed to close raft engine", "error", err)
5685
}
57-
r.engine = nil
5886
}
5987
if r.closeFactory != nil {
6088
if err := r.closeFactory(); err != nil {

0 commit comments

Comments
 (0)