Skip to content

Commit ebb7cc6

Browse files
authored
feat(keyviz): leader-term wire format + main.go ticker (Phase 2-C+ PR-3b) (#720)
## Summary Build the end-to-end pipeline that takes the sampler-side `RaftGroupID + LeaderTerm` fields shipped in PR #709 (Phase 2-C+ PR-3a) and surfaces them on the wire (proto + JSON + gRPC) so the fan-out aggregator (PR-3c, next) has data to dedupe by `(group, term)` instead of the conservative §4.2 max-merge. ## Why Per `docs/design/2026_04_27_proposed_keyviz_cluster_fanout.md` §4.2, the cluster fan-out today max-merges write samples per row — under stable leadership this is exact, but under a leadership flip it understates the true window total by at most one leader's pre-transfer slice. §9.1's canonical fix is to dedupe by `(bucket_id, raft_group_id, leader_term, column)` and sum across terms; that needs `raft_group_id + leader_term` on every row of the wire format. PR #709 added the fields to `MatrixRow`; this PR plumbs them through. ## Changes **Wire-format additions (forward-compatible):** - `proto/admin.proto`: `KeyVizRow.raft_group_id = 13` and `leader_term = 14`. Old SPAs ignore the new fields; old servers don't emit them — the zero value is the documented "term not tracked" fallback that triggers the legacy max-merge. - `internal/admin/keyviz_handler.go`: JSON `KeyVizRow` gains `RaftGroupID` and `LeaderTerm` (omitempty); `newKeyVizRowFrom` copies `mr.RaftGroupID` / `mr.LeaderTerm`. - `internal/admin/keyviz_fanout.go`: `mergeRowInto` seeds the destination row with `RaftGroupID + LeaderTerm` on first-seen (PR-3c will switch to a per-cell merge). - `adapter/admin_grpc.go`: `newKeyVizRowFrom` copies the same two fields onto the proto `KeyVizRow`. **main.go ticker:** - `startKeyVizLeaderTermPublisher` launches a goroutine that polls each `runtime.engine.Status().Term` every `Step` (matching the flush cadence so each column observes a fresh term). Skips entirely when the sampler is nil or no runtimes are wired. - `publishLeaderTerms` / `publishLeaderTermsFromSnapshots` split into two helpers so unit tests can exercise publication without standing up a full `raftengine.Engine` fake. - The ticker publishes once immediately so the very first flush column carries a non-zero term. ## Scope (PR-3b only) - Wire format **emission**: ✓ this PR - Ticker that **publishes** terms via `SetLeaderTerm`: ✓ this PR - Aggregator **consumption** (per-cell `(group, term)` merge replacing max-merge): **deferred to PR-3c** - `Conflict` per-cell promotion: **deferred to PR-3c** - SPA changes: **none required** — the wire-format extension is forward-compatible With this PR alone, the new fields are visible on every response but the aggregator still uses §4.2 max-merge → no behavior change for SPA consumers. ## Test plan - [x] `go test -race -count=1 ./keyviz/... ./internal/admin/...` — passes - [x] New test coverage: - `TestKeyVizHandlerStampsRaftIdentity` (JSON path) - `TestMergeKeyVizMatricesPreservesRaftIdentity` (fan-out merge) - `TestGetKeyVizMatrixStampsRaftIdentity` (gRPC path) - `TestPublishLeaderTermsFromSnapshotsStampsRows` (end-to-end through sampler) - `TestPublishLeaderTermsFromSnapshotsNilSafe` (nil-receiver contract) - `TestStartKeyVizLeaderTermPublisherSkipsWhenSamplerNil / ...WhenNoRuntimes` (errgroup lifecycle) - [x] `go build ./...` — clean - [x] `golangci-lint run` — 0 issues - [ ] Jepsen — N/A (admin-side wire extension, no replication/MVCC/OCC impact) ## Five-lens self-review 1. **Data loss** — wire-format extension only; legacy max-merge fallback preserves today's behavior when `LeaderTerm=0`. 2. **Concurrency** — ticker is a single goroutine; `SetLeaderTerm` takes `groupTermsMu` (Phase 2-C+ PR-3a contract); no new shared state. 3. **Performance** — `Observe` hot path is unchanged; ticker adds `N × Status()` calls per `Step` where `N` is the runtime count (typically ≤ a few groups), and `Status()` is a non-blocking read. 4. **Data consistency** — `RaftGroupID + LeaderTerm` round-trip through every wire path verified by table-driven tests. 5. **Test coverage** — each layer (JSON, fan-out, gRPC, publisher) has its own focused regression guard; end-to-end test pins the sampler→column propagation. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Keyviz matrix now includes Raft leadership metadata with group ID and leader term fields in responses. * Leadership term information is automatically tracked and included in key visualization output. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
2 parents 75c2c48 + c3f0ce6 commit ebb7cc6

12 files changed

Lines changed: 522 additions & 13 deletions

adapter/admin_grpc.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -612,6 +612,12 @@ func matrixToProto(cols []keyviz.MatrixColumn, pick func(keyviz.MatrixRow) uint6
612612
order = append(order, mr.RouteID)
613613
}
614614
pr.Values[j] = pick(mr)
615+
// Per-cell Raft identity stamped from this column's
616+
// MatrixRow (Gemini HIGH on PR #720 — a row-level
617+
// scalar would only capture the first column's
618+
// identity and break the per-cell dedupe goal).
619+
pr.RaftGroupIds[j] = mr.RaftGroupID
620+
pr.LeaderTerms[j] = mr.LeaderTerm
615621
}
616622
}
617623
resp.Rows = make([]*pb.KeyVizRow, len(order))
@@ -676,6 +682,11 @@ func newKeyVizRowFrom(mr keyviz.MatrixRow, numCols int) *pb.KeyVizRow {
676682
RouteCount: total,
677683
RouteIdsTruncated: mr.Aggregate && total > uint64(len(mr.MemberRoutes)),
678684
Values: make([]uint64, numCols),
685+
// Per-cell parallel arrays — populated in the column-loop
686+
// above, not here, because the first MatrixRow seen for a
687+
// route only carries that one column's identity.
688+
RaftGroupIds: make([]uint64, numCols),
689+
LeaderTerms: make([]uint64, numCols),
679690
}
680691
if mr.Aggregate {
681692
row.RouteIds = append([]uint64(nil), mr.MemberRoutes...)

adapter/admin_grpc_keyviz_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,3 +273,48 @@ func TestGetKeyVizMatrixHonorsRowsBudget(t *testing.T) {
273273
require.Equal(t, "route:2", resp.Rows[0].BucketId)
274274
require.Equal(t, "route:4", resp.Rows[1].BucketId)
275275
}
276+
277+
// TestGetKeyVizMatrixStampsRaftIdentity pins the Phase 2-C+ per-cell
278+
// wire extension: MatrixRow.RaftGroupID and MatrixRow.LeaderTerm
279+
// propagate per-column through matrixToProto into the proto
280+
// KeyVizRow's raft_group_ids (field 13) and leader_terms (field 14)
281+
// parallel-to-values arrays. The fan-out aggregator's per-cell
282+
// dedupe key requires per-column identity (Gemini HIGH on PR #720
283+
// — row-level scalars only captured the first column's identity
284+
// and broke dedupe under mid-window leader flips).
285+
func TestGetKeyVizMatrixStampsRaftIdentity(t *testing.T) {
286+
t.Parallel()
287+
t0 := time.Unix(1_700_000_000, 0)
288+
t1 := t0.Add(time.Minute)
289+
srv := newAdminServerWithFakeSampler(t, []keyviz.MatrixColumn{
290+
{
291+
At: t0,
292+
Rows: []keyviz.MatrixRow{
293+
{RouteID: 1, Start: []byte("a"), End: []byte("b"), Writes: 5, RaftGroupID: 7, LeaderTerm: 42},
294+
{RouteID: 2, Start: []byte("b"), End: []byte("c"), Writes: 9, RaftGroupID: 0, LeaderTerm: 0},
295+
},
296+
},
297+
// col 1: term flipped on route 1 mid-window; route 2 absent.
298+
{
299+
At: t1,
300+
Rows: []keyviz.MatrixRow{
301+
{RouteID: 1, Start: []byte("a"), End: []byte("b"), Writes: 11, RaftGroupID: 7, LeaderTerm: 43},
302+
},
303+
},
304+
})
305+
306+
resp, err := srv.GetKeyVizMatrix(context.Background(), &pb.GetKeyVizMatrixRequest{
307+
Series: pb.KeyVizSeries_KEYVIZ_SERIES_WRITES,
308+
})
309+
require.NoError(t, err)
310+
require.Len(t, resp.Rows, 2)
311+
// route:1 — per-cell identity captures the term flip.
312+
require.Equal(t, "route:1", resp.Rows[0].BucketId)
313+
require.Equal(t, []uint64{7, 7}, resp.Rows[0].RaftGroupIds, "raft_group_ids must be parallel to values")
314+
require.Equal(t, []uint64{42, 43}, resp.Rows[0].LeaderTerms,
315+
"leader_terms must capture per-column term so the aggregator can dedupe across the flip")
316+
// route:2 — present only in col0; col1 zero is the "term not tracked" sentinel.
317+
require.Equal(t, "route:2", resp.Rows[1].BucketId)
318+
require.Equal(t, []uint64{0, 0}, resp.Rows[1].RaftGroupIds)
319+
require.Equal(t, []uint64{0, 0}, resp.Rows[1].LeaderTerms)
320+
}

internal/admin/keyviz_fanout.go

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -526,6 +526,13 @@ func mergeRowInto(
526526
RouteIDsTruncated: row.RouteIDsTruncated,
527527
RouteCount: row.RouteCount,
528528
Values: make([]uint64, mergedWidth),
529+
// Per-cell parallel arrays sized to the merged column
530+
// width. Stamped per-cell in the loop below from the
531+
// largest-source for that cell, so the identity always
532+
// matches the value we kept (Gemini MEDIUM on PR #720
533+
// resolved by going per-cell).
534+
RaftGroupIDs: make([]uint64, mergedWidth),
535+
LeaderTerms: make([]uint64, mergedWidth),
529536
}
530537
rowsByBucket[row.BucketID] = dst
531538
*bucketOrder = append(*bucketOrder, row.BucketID)
@@ -535,11 +542,42 @@ func mergeRowInto(
535542
if !ok || j >= len(row.Values) {
536543
continue
537544
}
538-
next, conflict := mergeFn(dst.Values[idx], row.Values[j])
545+
prev := dst.Values[idx]
546+
next, conflict := mergeFn(prev, row.Values[j])
539547
dst.Values[idx] = next
540548
if conflict {
541549
dst.Conflict = true
542550
}
551+
// Identity belongs to the source whose value we kept. For
552+
// sumMerge (reads) this is best-effort: prev+incoming has
553+
// no single owner, so we keep whichever side contributed
554+
// most recently — close-to-correct in the steady state.
555+
// For maxMerge (writes), `next == row.Values[j]` exactly
556+
// when the incoming source won the cell; in the tied
557+
// (prev == incoming != 0) case `next == prev`, both sides
558+
// agree on the value, and either identity is acceptable.
559+
if next == row.Values[j] {
560+
// Mixed-version cluster: a legacy peer that does not
561+
// emit raft_group_ids / leader_terms sends empty
562+
// slices. When such a peer's value wins the cell, we
563+
// must EXPLICITLY RESET dst.RaftGroupIDs[idx] /
564+
// dst.LeaderTerms[idx] to 0 (the documented "term not
565+
// tracked" sentinel) — leaving stale identity from a
566+
// previous higher-versioned source would mislabel an
567+
// unknown-term winning cell as a known term and break
568+
// the per-cell dedupe/summing in PR-3c. (Codex P2
569+
// round-2 on PR #720.)
570+
if j < len(row.RaftGroupIDs) {
571+
dst.RaftGroupIDs[idx] = row.RaftGroupIDs[j]
572+
} else {
573+
dst.RaftGroupIDs[idx] = 0
574+
}
575+
if j < len(row.LeaderTerms) {
576+
dst.LeaderTerms[idx] = row.LeaderTerms[j]
577+
} else {
578+
dst.LeaderTerms[idx] = 0
579+
}
580+
}
543581
}
544582
}
545583

internal/admin/keyviz_fanout_test.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,108 @@ func TestMergeKeyVizMatricesWritesMaxStableLeader(t *testing.T) {
7070
require.False(t, merged.Rows[0].Conflict, "stable-leader merge must not raise conflict")
7171
}
7272

73+
// TestMergeKeyVizMatricesPreservesRaftIdentity pins the Phase 2-C+
74+
// per-cell wire extension on the fan-out merge path: mergeRowInto
75+
// stamps the destination row's per-cell (RaftGroupIDs[idx],
76+
// LeaderTerms[idx]) from whichever source contributed the value
77+
// kept at that cell. Both sources reporting the same identity
78+
// for a writes-max merge is the steady-state shape — merged
79+
// identity matches regardless of source order. (Gemini HIGH on
80+
// PR #720 resolved by going per-cell; row-level scalars would
81+
// only capture the first column's identity and break the per-cell
82+
// dedupe goal.)
83+
func TestMergeKeyVizMatricesPreservesRaftIdentity(t *testing.T) {
84+
t.Parallel()
85+
col := []int64{1_700_000_000_000}
86+
a := KeyVizMatrix{
87+
ColumnUnixMs: col,
88+
Series: keyVizSeriesWrites,
89+
Rows: []KeyVizRow{
90+
{BucketID: "route:5", Values: []uint64{30}, RaftGroupIDs: []uint64{7}, LeaderTerms: []uint64{42}},
91+
},
92+
}
93+
b := KeyVizMatrix{
94+
ColumnUnixMs: col,
95+
Series: keyVizSeriesWrites,
96+
Rows: []KeyVizRow{
97+
{BucketID: "route:5", Values: []uint64{0}, RaftGroupIDs: []uint64{7}, LeaderTerms: []uint64{42}},
98+
},
99+
}
100+
merged := mergeKeyVizMatrices([]KeyVizMatrix{a, b}, keyVizSeriesWrites)
101+
require.Len(t, merged.Rows, 1)
102+
require.Equal(t, []uint64{7}, merged.Rows[0].RaftGroupIDs, "RaftGroupIDs must survive mergeRowInto")
103+
require.Equal(t, []uint64{42}, merged.Rows[0].LeaderTerms, "LeaderTerms must survive mergeRowInto")
104+
}
105+
106+
// TestMergeKeyVizMatricesLegacyPeerWinnerResetsIdentity pins the
107+
// fail-closed semantic for mixed-version clusters (Codex P2
108+
// round-2): when a legacy peer that does not emit raft_group_ids /
109+
// leader_terms (empty slices) wins a cell, dst.RaftGroupIDs[idx]
110+
// and dst.LeaderTerms[idx] must be RESET to 0 (the documented
111+
// "term not tracked" sentinel) — leaving stale identity from a
112+
// previous higher-versioned source would mislabel an unknown-term
113+
// winning cell as a known term and break the per-cell
114+
// dedupe/summing in PR-3c.
115+
func TestMergeKeyVizMatricesLegacyPeerWinnerResetsIdentity(t *testing.T) {
116+
t.Parallel()
117+
col := []int64{1_700_000_000_000}
118+
// Source 1 (modern): reports 30 with identity (group=7, term=42).
119+
modern := KeyVizMatrix{
120+
ColumnUnixMs: col,
121+
Series: keyVizSeriesWrites,
122+
Rows: []KeyVizRow{
123+
{BucketID: "route:5", Values: []uint64{30}, RaftGroupIDs: []uint64{7}, LeaderTerms: []uint64{42}},
124+
},
125+
}
126+
// Source 2 (legacy): reports 50 with NO arrays (older server
127+
// build). Wins maxMerge but cannot identify its term.
128+
legacy := KeyVizMatrix{
129+
ColumnUnixMs: col,
130+
Series: keyVizSeriesWrites,
131+
Rows: []KeyVizRow{
132+
{BucketID: "route:5", Values: []uint64{50}},
133+
},
134+
}
135+
merged := mergeKeyVizMatrices([]KeyVizMatrix{modern, legacy}, keyVizSeriesWrites)
136+
require.Len(t, merged.Rows, 1)
137+
require.Equal(t, []uint64{50}, merged.Rows[0].Values, "legacy peer's value wins maxMerge")
138+
require.Equal(t, []uint64{0}, merged.Rows[0].RaftGroupIDs,
139+
"legacy winner without metadata must reset dst identity to 0 sentinel — not inherit modern's 7")
140+
require.Equal(t, []uint64{0}, merged.Rows[0].LeaderTerms,
141+
"legacy winner without metadata must reset dst term to 0 sentinel — not inherit modern's 42")
142+
}
143+
144+
// TestMergeKeyVizMatricesPerCellIdentityMatchesValueOwner pins the
145+
// Gemini MEDIUM fix: when maxMerge picks a value from one source,
146+
// the identity at that cell must come from the SAME source — not
147+
// from whoever happened to be processed first. Drives a leadership
148+
// flip across two consecutive columns with each leader winning
149+
// one cell.
150+
func TestMergeKeyVizMatricesPerCellIdentityMatchesValueOwner(t *testing.T) {
151+
t.Parallel()
152+
col := []int64{1_700_000_000_000, 1_700_000_001_000}
153+
exLeader := KeyVizMatrix{
154+
ColumnUnixMs: col,
155+
Series: keyVizSeriesWrites,
156+
Rows: []KeyVizRow{
157+
{BucketID: "route:9", Values: []uint64{50, 0}, RaftGroupIDs: []uint64{7, 7}, LeaderTerms: []uint64{42, 42}},
158+
},
159+
}
160+
newLeader := KeyVizMatrix{
161+
ColumnUnixMs: col,
162+
Series: keyVizSeriesWrites,
163+
Rows: []KeyVizRow{
164+
{BucketID: "route:9", Values: []uint64{0, 80}, RaftGroupIDs: []uint64{7, 7}, LeaderTerms: []uint64{43, 43}},
165+
},
166+
}
167+
merged := mergeKeyVizMatrices([]KeyVizMatrix{exLeader, newLeader}, keyVizSeriesWrites)
168+
require.Len(t, merged.Rows, 1)
169+
require.Equal(t, []uint64{50, 80}, merged.Rows[0].Values, "writes max-merge picks the larger per cell")
170+
require.Equal(t, []uint64{7, 7}, merged.Rows[0].RaftGroupIDs, "groupID stays 7 across both cells")
171+
require.Equal(t, []uint64{42, 43}, merged.Rows[0].LeaderTerms,
172+
"col0's identity comes from exLeader (term 42, won 50 vs 0); col1's identity comes from newLeader (term 43, won 80 vs 0)")
173+
}
174+
73175
// TestMergeKeyVizMatricesWritesMaxLeadershipFlip pins §4.2 under a
74176
// mid-window flip: two nodes report non-zero, disagreeing values
75177
// for the same cell. The merge keeps the larger value and raises

internal/admin/keyviz_handler.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,20 @@ type KeyVizRow struct {
8888
RouteCount uint64 `json:"route_count"`
8989
Values []uint64 `json:"values"`
9090
Conflict bool `json:"conflict,omitempty"`
91+
// RaftGroupIDs[j] and LeaderTerms[j] carry the route's Raft
92+
// identity at the time column j was flushed (parallel to
93+
// Values[]). Phase 2-C+ fan-out uses
94+
// (bucket_id, raft_group_id, leader_term, column) as the
95+
// dedupe key. Per-cell representation is required because
96+
// leadership can flip within the requested window; row-level
97+
// scalars would only capture the first column's identity and
98+
// cause incorrect dedupe for later columns (Gemini HIGH on
99+
// PR #720). Zero values mean "term not tracked" — the
100+
// aggregator falls back to the legacy max-merge for those
101+
// cells. The slices are either nil (legacy server, no
102+
// per-column identity to share) or len == len(Values).
103+
RaftGroupIDs []uint64 `json:"raft_group_ids,omitempty"`
104+
LeaderTerms []uint64 `json:"leader_terms,omitempty"`
91105
// total accumulates the sum of Values during pivot so the
92106
// rowBudget sort is O(N log N) on a precomputed key rather
93107
// than O(N log N × M) recomputing the sum per comparison.
@@ -322,6 +336,13 @@ func pivotKeyVizColumns(cols []keyviz.MatrixColumn, series KeyVizSeries, rowBudg
322336
}
323337
v := pick(mr)
324338
row.Values[j] = v
339+
// Per-cell Raft identity stamped from this column's
340+
// MatrixRow. Each MatrixRow is emitted by Flush() with
341+
// the term snapshot taken at that flush moment, so the
342+
// j-th column's identity may differ from j-1 if a leader
343+
// term flipped between flushes (Gemini HIGH on PR #720).
344+
row.RaftGroupIDs[j] = mr.RaftGroupID
345+
row.LeaderTerms[j] = mr.LeaderTerm
325346
row.total += v
326347
}
327348
}
@@ -373,6 +394,11 @@ func newKeyVizRowFrom(mr keyviz.MatrixRow, numCols int) *KeyVizRow {
373394
RouteCount: total,
374395
RouteIDsTruncated: mr.Aggregate && total > uint64(len(mr.MemberRoutes)),
375396
Values: make([]uint64, numCols),
397+
// Per-cell parallel arrays — populated in the column-loop
398+
// above, not here, because the first MatrixRow seen for a
399+
// route only carries that one column's identity.
400+
RaftGroupIDs: make([]uint64, numCols),
401+
LeaderTerms: make([]uint64, numCols),
376402
}
377403
if mr.Aggregate {
378404
row.RouteIDs = append([]uint64(nil), mr.MemberRoutes...)

internal/admin/keyviz_handler_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,3 +498,50 @@ func TestKeyVizHandlerSkipsFanoutForPeerCall(t *testing.T) {
498498
require.Equal(t, 0, peerHits,
499499
"recursion guard violated: handler dialled a peer despite X-Admin-Fanout-Peer being set")
500500
}
501+
502+
// TestKeyVizHandlerStampsRaftIdentity pins the Phase 2-C+ per-cell
503+
// wire extension on the JSON path: MatrixRow.RaftGroupID and
504+
// MatrixRow.LeaderTerm propagate per-column through the pivot into
505+
// the JSON KeyVizRow's raft_group_ids[] and leader_terms[] arrays
506+
// (parallel to values[]). A row that appears in only some columns
507+
// reports zero (the documented "term not tracked" sentinel) for
508+
// the columns where it's absent.
509+
func TestKeyVizHandlerStampsRaftIdentity(t *testing.T) {
510+
t.Parallel()
511+
t0 := time.Unix(1_700_000_000, 0)
512+
t1 := t0.Add(time.Minute)
513+
srv := newKeyVizTestServer(t, &fakeKeyVizSource{cols: []keyviz.MatrixColumn{
514+
{
515+
At: t0,
516+
Rows: []keyviz.MatrixRow{
517+
{RouteID: 1, Start: []byte("a"), End: []byte("b"), Writes: 5, RaftGroupID: 7, LeaderTerm: 42},
518+
{RouteID: 2, Start: []byte("b"), End: []byte("c"), Writes: 9},
519+
},
520+
},
521+
// col 1: route 1 still active (term flipped to 43); route 2 absent.
522+
{
523+
At: t1,
524+
Rows: []keyviz.MatrixRow{
525+
{RouteID: 1, Start: []byte("a"), End: []byte("b"), Writes: 11, RaftGroupID: 7, LeaderTerm: 43},
526+
},
527+
},
528+
}})
529+
defer srv.Close()
530+
531+
resp := keyVizGet(t, srv.URL)
532+
defer resp.Body.Close()
533+
require.Equal(t, http.StatusOK, resp.StatusCode)
534+
var matrix KeyVizMatrix
535+
require.NoError(t, json.NewDecoder(resp.Body).Decode(&matrix))
536+
require.Len(t, matrix.Rows, 2)
537+
r1, r2 := matrix.Rows[0], matrix.Rows[1]
538+
require.Equal(t, "route:1", r1.BucketID)
539+
// Per-cell: col0 was term 42, col1 was term 43 (mid-window flip).
540+
require.Equal(t, []uint64{7, 7}, r1.RaftGroupIDs, "raft_group_ids must be parallel to values")
541+
require.Equal(t, []uint64{42, 43}, r1.LeaderTerms,
542+
"leader_terms must capture per-column term so the aggregator can dedupe across the flip")
543+
// route:2 — present only in col0; col1 zero is the "term not tracked" sentinel.
544+
require.Equal(t, "route:2", r2.BucketID)
545+
require.Equal(t, []uint64{0, 0}, r2.RaftGroupIDs)
546+
require.Equal(t, []uint64{0, 0}, r2.LeaderTerms)
547+
}

0 commit comments

Comments
 (0)