Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions adapter/admin_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,12 @@ func matrixToProto(cols []keyviz.MatrixColumn, pick func(keyviz.MatrixRow) uint6
order = append(order, mr.RouteID)
}
pr.Values[j] = pick(mr)
// Per-cell Raft identity stamped from this column's
// MatrixRow (Gemini HIGH on PR #720 — a row-level
// scalar would only capture the first column's
// identity and break the per-cell dedupe goal).
pr.RaftGroupIds[j] = mr.RaftGroupID
pr.LeaderTerms[j] = mr.LeaderTerm
}
}
resp.Rows = make([]*pb.KeyVizRow, len(order))
Expand Down Expand Up @@ -676,6 +682,11 @@ func newKeyVizRowFrom(mr keyviz.MatrixRow, numCols int) *pb.KeyVizRow {
RouteCount: total,
RouteIdsTruncated: mr.Aggregate && total > uint64(len(mr.MemberRoutes)),
Values: make([]uint64, numCols),
// Per-cell parallel arrays — populated in the column-loop
// above, not here, because the first MatrixRow seen for a
// route only carries that one column's identity.
RaftGroupIds: make([]uint64, numCols),
LeaderTerms: make([]uint64, numCols),
}
if mr.Aggregate {
row.RouteIds = append([]uint64(nil), mr.MemberRoutes...)
Expand Down
45 changes: 45 additions & 0 deletions adapter/admin_grpc_keyviz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,3 +273,48 @@ func TestGetKeyVizMatrixHonorsRowsBudget(t *testing.T) {
require.Equal(t, "route:2", resp.Rows[0].BucketId)
require.Equal(t, "route:4", resp.Rows[1].BucketId)
}

// TestGetKeyVizMatrixStampsRaftIdentity pins the Phase 2-C+ per-cell
// wire extension: MatrixRow.RaftGroupID and MatrixRow.LeaderTerm
// propagate per-column through matrixToProto into the proto
// KeyVizRow's raft_group_ids (field 13) and leader_terms (field 14)
// parallel-to-values arrays. The fan-out aggregator's per-cell
// dedupe key requires per-column identity (Gemini HIGH on PR #720
// — row-level scalars only captured the first column's identity
// and broke dedupe under mid-window leader flips).
func TestGetKeyVizMatrixStampsRaftIdentity(t *testing.T) {
t.Parallel()
t0 := time.Unix(1_700_000_000, 0)
t1 := t0.Add(time.Minute)
srv := newAdminServerWithFakeSampler(t, []keyviz.MatrixColumn{
{
At: t0,
Rows: []keyviz.MatrixRow{
{RouteID: 1, Start: []byte("a"), End: []byte("b"), Writes: 5, RaftGroupID: 7, LeaderTerm: 42},
{RouteID: 2, Start: []byte("b"), End: []byte("c"), Writes: 9, RaftGroupID: 0, LeaderTerm: 0},
},
},
// col 1: term flipped on route 1 mid-window; route 2 absent.
{
At: t1,
Rows: []keyviz.MatrixRow{
{RouteID: 1, Start: []byte("a"), End: []byte("b"), Writes: 11, RaftGroupID: 7, LeaderTerm: 43},
},
},
})

resp, err := srv.GetKeyVizMatrix(context.Background(), &pb.GetKeyVizMatrixRequest{
Series: pb.KeyVizSeries_KEYVIZ_SERIES_WRITES,
})
require.NoError(t, err)
require.Len(t, resp.Rows, 2)
// route:1 — per-cell identity captures the term flip.
require.Equal(t, "route:1", resp.Rows[0].BucketId)
require.Equal(t, []uint64{7, 7}, resp.Rows[0].RaftGroupIds, "raft_group_ids must be parallel to values")
require.Equal(t, []uint64{42, 43}, resp.Rows[0].LeaderTerms,
"leader_terms must capture per-column term so the aggregator can dedupe across the flip")
// route:2 — present only in col0; col1 zero is the "term not tracked" sentinel.
require.Equal(t, "route:2", resp.Rows[1].BucketId)
require.Equal(t, []uint64{0, 0}, resp.Rows[1].RaftGroupIds)
require.Equal(t, []uint64{0, 0}, resp.Rows[1].LeaderTerms)
}
40 changes: 39 additions & 1 deletion internal/admin/keyviz_fanout.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,13 @@ func mergeRowInto(
RouteIDsTruncated: row.RouteIDsTruncated,
RouteCount: row.RouteCount,
Values: make([]uint64, mergedWidth),
// Per-cell parallel arrays sized to the merged column
// width. Stamped per-cell in the loop below from the
// largest-source for that cell, so the identity always
// matches the value we kept (Gemini MEDIUM on PR #720
// resolved by going per-cell).
RaftGroupIDs: make([]uint64, mergedWidth),
LeaderTerms: make([]uint64, mergedWidth),
}
rowsByBucket[row.BucketID] = dst
*bucketOrder = append(*bucketOrder, row.BucketID)
Expand All @@ -535,11 +542,42 @@ func mergeRowInto(
if !ok || j >= len(row.Values) {
continue
}
next, conflict := mergeFn(dst.Values[idx], row.Values[j])
prev := dst.Values[idx]
next, conflict := mergeFn(prev, row.Values[j])
dst.Values[idx] = next
if conflict {
dst.Conflict = true
}
// Identity belongs to the source whose value we kept. For
// sumMerge (reads) this is best-effort: prev+incoming has
// no single owner, so we keep whichever side contributed
// most recently — close-to-correct in the steady state.
// For maxMerge (writes), `next == row.Values[j]` exactly
// when the incoming source won the cell; in the tied
// (prev == incoming != 0) case `next == prev`, both sides
// agree on the value, and either identity is acceptable.
if next == row.Values[j] {
// Mixed-version cluster: a legacy peer that does not
// emit raft_group_ids / leader_terms sends empty
// slices. When such a peer's value wins the cell, we
// must EXPLICITLY RESET dst.RaftGroupIDs[idx] /
// dst.LeaderTerms[idx] to 0 (the documented "term not
// tracked" sentinel) — leaving stale identity from a
// previous higher-versioned source would mislabel an
// unknown-term winning cell as a known term and break
// the per-cell dedupe/summing in PR-3c. (Codex P2
// round-2 on PR #720.)
if j < len(row.RaftGroupIDs) {
dst.RaftGroupIDs[idx] = row.RaftGroupIDs[j]
} else {
dst.RaftGroupIDs[idx] = 0
}
if j < len(row.LeaderTerms) {
dst.LeaderTerms[idx] = row.LeaderTerms[j]
} else {
dst.LeaderTerms[idx] = 0
}
}
}
}

Expand Down
102 changes: 102 additions & 0 deletions internal/admin/keyviz_fanout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,108 @@ func TestMergeKeyVizMatricesWritesMaxStableLeader(t *testing.T) {
require.False(t, merged.Rows[0].Conflict, "stable-leader merge must not raise conflict")
}

// TestMergeKeyVizMatricesPreservesRaftIdentity pins the Phase 2-C+
// per-cell wire extension on the fan-out merge path: mergeRowInto
// stamps the destination row's per-cell (RaftGroupIDs[idx],
// LeaderTerms[idx]) from whichever source contributed the value
// kept at that cell. Both sources reporting the same identity
// for a writes-max merge is the steady-state shape — merged
// identity matches regardless of source order. (Gemini HIGH on
// PR #720 resolved by going per-cell; row-level scalars would
// only capture the first column's identity and break the per-cell
// dedupe goal.)
func TestMergeKeyVizMatricesPreservesRaftIdentity(t *testing.T) {
t.Parallel()
col := []int64{1_700_000_000_000}
a := KeyVizMatrix{
ColumnUnixMs: col,
Series: keyVizSeriesWrites,
Rows: []KeyVizRow{
{BucketID: "route:5", Values: []uint64{30}, RaftGroupIDs: []uint64{7}, LeaderTerms: []uint64{42}},
},
}
b := KeyVizMatrix{
ColumnUnixMs: col,
Series: keyVizSeriesWrites,
Rows: []KeyVizRow{
{BucketID: "route:5", Values: []uint64{0}, RaftGroupIDs: []uint64{7}, LeaderTerms: []uint64{42}},
},
}
merged := mergeKeyVizMatrices([]KeyVizMatrix{a, b}, keyVizSeriesWrites)
require.Len(t, merged.Rows, 1)
require.Equal(t, []uint64{7}, merged.Rows[0].RaftGroupIDs, "RaftGroupIDs must survive mergeRowInto")
require.Equal(t, []uint64{42}, merged.Rows[0].LeaderTerms, "LeaderTerms must survive mergeRowInto")
}

// TestMergeKeyVizMatricesLegacyPeerWinnerResetsIdentity pins the
// fail-closed semantic for mixed-version clusters (Codex P2
// round-2): when a legacy peer that does not emit raft_group_ids /
// leader_terms (empty slices) wins a cell, dst.RaftGroupIDs[idx]
// and dst.LeaderTerms[idx] must be RESET to 0 (the documented
// "term not tracked" sentinel) — leaving stale identity from a
// previous higher-versioned source would mislabel an unknown-term
// winning cell as a known term and break the per-cell
// dedupe/summing in PR-3c.
func TestMergeKeyVizMatricesLegacyPeerWinnerResetsIdentity(t *testing.T) {
t.Parallel()
col := []int64{1_700_000_000_000}
// Source 1 (modern): reports 30 with identity (group=7, term=42).
modern := KeyVizMatrix{
ColumnUnixMs: col,
Series: keyVizSeriesWrites,
Rows: []KeyVizRow{
{BucketID: "route:5", Values: []uint64{30}, RaftGroupIDs: []uint64{7}, LeaderTerms: []uint64{42}},
},
}
// Source 2 (legacy): reports 50 with NO arrays (older server
// build). Wins maxMerge but cannot identify its term.
legacy := KeyVizMatrix{
ColumnUnixMs: col,
Series: keyVizSeriesWrites,
Rows: []KeyVizRow{
{BucketID: "route:5", Values: []uint64{50}},
},
}
merged := mergeKeyVizMatrices([]KeyVizMatrix{modern, legacy}, keyVizSeriesWrites)
require.Len(t, merged.Rows, 1)
require.Equal(t, []uint64{50}, merged.Rows[0].Values, "legacy peer's value wins maxMerge")
require.Equal(t, []uint64{0}, merged.Rows[0].RaftGroupIDs,
"legacy winner without metadata must reset dst identity to 0 sentinel — not inherit modern's 7")
require.Equal(t, []uint64{0}, merged.Rows[0].LeaderTerms,
"legacy winner without metadata must reset dst term to 0 sentinel — not inherit modern's 42")
}

// TestMergeKeyVizMatricesPerCellIdentityMatchesValueOwner pins the
// Gemini MEDIUM fix: when maxMerge picks a value from one source,
// the identity at that cell must come from the SAME source — not
// from whoever happened to be processed first. Drives a leadership
// flip across two consecutive columns with each leader winning
// one cell.
func TestMergeKeyVizMatricesPerCellIdentityMatchesValueOwner(t *testing.T) {
t.Parallel()
col := []int64{1_700_000_000_000, 1_700_000_001_000}
exLeader := KeyVizMatrix{
ColumnUnixMs: col,
Series: keyVizSeriesWrites,
Rows: []KeyVizRow{
{BucketID: "route:9", Values: []uint64{50, 0}, RaftGroupIDs: []uint64{7, 7}, LeaderTerms: []uint64{42, 42}},
},
}
newLeader := KeyVizMatrix{
ColumnUnixMs: col,
Series: keyVizSeriesWrites,
Rows: []KeyVizRow{
{BucketID: "route:9", Values: []uint64{0, 80}, RaftGroupIDs: []uint64{7, 7}, LeaderTerms: []uint64{43, 43}},
},
}
merged := mergeKeyVizMatrices([]KeyVizMatrix{exLeader, newLeader}, keyVizSeriesWrites)
require.Len(t, merged.Rows, 1)
require.Equal(t, []uint64{50, 80}, merged.Rows[0].Values, "writes max-merge picks the larger per cell")
require.Equal(t, []uint64{7, 7}, merged.Rows[0].RaftGroupIDs, "groupID stays 7 across both cells")
require.Equal(t, []uint64{42, 43}, merged.Rows[0].LeaderTerms,
"col0's identity comes from exLeader (term 42, won 50 vs 0); col1's identity comes from newLeader (term 43, won 80 vs 0)")
}

// TestMergeKeyVizMatricesWritesMaxLeadershipFlip pins §4.2 under a
// mid-window flip: two nodes report non-zero, disagreeing values
// for the same cell. The merge keeps the larger value and raises
Expand Down
26 changes: 26 additions & 0 deletions internal/admin/keyviz_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,20 @@ type KeyVizRow struct {
RouteCount uint64 `json:"route_count"`
Values []uint64 `json:"values"`
Conflict bool `json:"conflict,omitempty"`
// RaftGroupIDs[j] and LeaderTerms[j] carry the route's Raft
// identity at the time column j was flushed (parallel to
// Values[]). Phase 2-C+ fan-out uses
// (bucket_id, raft_group_id, leader_term, column) as the
// dedupe key. Per-cell representation is required because
// leadership can flip within the requested window; row-level
// scalars would only capture the first column's identity and
// cause incorrect dedupe for later columns (Gemini HIGH on
// PR #720). Zero values mean "term not tracked" — the
// aggregator falls back to the legacy max-merge for those
// cells. The slices are either nil (legacy server, no
// per-column identity to share) or len == len(Values).
RaftGroupIDs []uint64 `json:"raft_group_ids,omitempty"`
LeaderTerms []uint64 `json:"leader_terms,omitempty"`
// total accumulates the sum of Values during pivot so the
// rowBudget sort is O(N log N) on a precomputed key rather
// than O(N log N × M) recomputing the sum per comparison.
Expand Down Expand Up @@ -322,6 +336,13 @@ func pivotKeyVizColumns(cols []keyviz.MatrixColumn, series KeyVizSeries, rowBudg
}
v := pick(mr)
row.Values[j] = v
// Per-cell Raft identity stamped from this column's
// MatrixRow. Each MatrixRow is emitted by Flush() with
// the term snapshot taken at that flush moment, so the
// j-th column's identity may differ from j-1 if a leader
// term flipped between flushes (Gemini HIGH on PR #720).
row.RaftGroupIDs[j] = mr.RaftGroupID
row.LeaderTerms[j] = mr.LeaderTerm
row.total += v
}
}
Expand Down Expand Up @@ -373,6 +394,11 @@ func newKeyVizRowFrom(mr keyviz.MatrixRow, numCols int) *KeyVizRow {
RouteCount: total,
RouteIDsTruncated: mr.Aggregate && total > uint64(len(mr.MemberRoutes)),
Values: make([]uint64, numCols),
// Per-cell parallel arrays — populated in the column-loop
// above, not here, because the first MatrixRow seen for a
// route only carries that one column's identity.
RaftGroupIDs: make([]uint64, numCols),
LeaderTerms: make([]uint64, numCols),
}
if mr.Aggregate {
row.RouteIDs = append([]uint64(nil), mr.MemberRoutes...)
Expand Down
47 changes: 47 additions & 0 deletions internal/admin/keyviz_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,3 +498,50 @@ func TestKeyVizHandlerSkipsFanoutForPeerCall(t *testing.T) {
require.Equal(t, 0, peerHits,
"recursion guard violated: handler dialled a peer despite X-Admin-Fanout-Peer being set")
}

// TestKeyVizHandlerStampsRaftIdentity pins the Phase 2-C+ per-cell
// wire extension on the JSON path: MatrixRow.RaftGroupID and
// MatrixRow.LeaderTerm propagate per-column through the pivot into
// the JSON KeyVizRow's raft_group_ids[] and leader_terms[] arrays
// (parallel to values[]). A row that appears in only some columns
// reports zero (the documented "term not tracked" sentinel) for
// the columns where it's absent.
func TestKeyVizHandlerStampsRaftIdentity(t *testing.T) {
t.Parallel()
t0 := time.Unix(1_700_000_000, 0)
t1 := t0.Add(time.Minute)
srv := newKeyVizTestServer(t, &fakeKeyVizSource{cols: []keyviz.MatrixColumn{
{
At: t0,
Rows: []keyviz.MatrixRow{
{RouteID: 1, Start: []byte("a"), End: []byte("b"), Writes: 5, RaftGroupID: 7, LeaderTerm: 42},
{RouteID: 2, Start: []byte("b"), End: []byte("c"), Writes: 9},
},
},
// col 1: route 1 still active (term flipped to 43); route 2 absent.
{
At: t1,
Rows: []keyviz.MatrixRow{
{RouteID: 1, Start: []byte("a"), End: []byte("b"), Writes: 11, RaftGroupID: 7, LeaderTerm: 43},
},
},
}})
defer srv.Close()

resp := keyVizGet(t, srv.URL)
defer resp.Body.Close()
require.Equal(t, http.StatusOK, resp.StatusCode)
var matrix KeyVizMatrix
require.NoError(t, json.NewDecoder(resp.Body).Decode(&matrix))
require.Len(t, matrix.Rows, 2)
r1, r2 := matrix.Rows[0], matrix.Rows[1]
require.Equal(t, "route:1", r1.BucketID)
// Per-cell: col0 was term 42, col1 was term 43 (mid-window flip).
require.Equal(t, []uint64{7, 7}, r1.RaftGroupIDs, "raft_group_ids must be parallel to values")
require.Equal(t, []uint64{42, 43}, r1.LeaderTerms,
"leader_terms must capture per-column term so the aggregator can dedupe across the flip")
// route:2 — present only in col0; col1 zero is the "term not tracked" sentinel.
require.Equal(t, "route:2", r2.BucketID)
require.Equal(t, []uint64{0, 0}, r2.RaftGroupIDs)
require.Equal(t, []uint64{0, 0}, r2.LeaderTerms)
}
Loading
Loading