Skip to content

Commit eab91c9

Browse files
committed
feat(keyviz): address PR #709 round-1 (Claude bot 4 items)
- groupID=0 reserved for virtual aggregate buckets: SetLeaderTerm now silently ignores calls with groupID=0 (early-return) and the doc explicitly notes the reservation. Without this guard, a future caller that accidentally publishes a non-zero term for groupID=0 would cause appendDrainedRow's `terms[groupID]` lookup to stamp aggregate rows with that term, breaking the fan-out merge's max-merge fallback for cross-group cells. - TestSetLeaderTermZeroGroupIDDoesNotPolluteVirtualBucket reproduces the bug (fails without the guard) and pins the post-fix invariant that aggregate rows always emit LeaderTerm=0 even when SetLeaderTerm(0, x) was called. - TestVirtualBucketRaftGroupIDIsZero closes the gap that no test pinned: even when member routes are registered with non-zero groupIDs, the over-budget virtual bucket must emit RaftGroupID=0 so the fan-out merge falls back to max-merge for cross-group aggregate cells. - TestRegisterRouteSecondCallIgnoresGroupID locks the live-slot idempotency contract documented on RegisterRoute: a second call with a different groupID returns true without updating the slot's metadata. Callers that need to retag must RemoveRoute + RegisterRoute. - snapshotGroupTerms returns nil when no terms have been published yet, avoiding the per-Flush map allocation before PR-3b wires SetLeaderTerm. `nil[k]` returns the zero value in Go, so the row-builder hot path needs no nil guard. - Cleanup: stale comment leftover ("// TestNonPositive... pins Codex round-8 P2: a") that was orphaned when the TestRegisterRouteCarriesGroupID block was inserted.
1 parent e7f3bad commit eab91c9

2 files changed

Lines changed: 116 additions & 2 deletions

File tree

keyviz/sampler.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -377,9 +377,15 @@ func NewMemSampler(opts MemSamplerOptions) *MemSampler {
377377
// nodes that have not finished engine startup contribute partial data
378378
// without poisoning the merge.
379379
//
380+
// groupID == 0 is reserved for virtual aggregate buckets (which span
381+
// multiple real groups). Calls with groupID == 0 are silently ignored
382+
// so a future caller cannot accidentally stamp a non-zero term on
383+
// aggregate rows — they must remain LeaderTerm == 0 so the fan-out
384+
// merge falls back to max-merge for cross-group cells.
385+
//
380386
// nil-receiver-safe.
381387
func (s *MemSampler) SetLeaderTerm(groupID, term uint64) {
382-
if s == nil {
388+
if s == nil || groupID == 0 {
383389
return
384390
}
385391
s.groupTermsMu.Lock()
@@ -390,9 +396,15 @@ func (s *MemSampler) SetLeaderTerm(groupID, term uint64) {
390396
// snapshotGroupTerms returns a copy of the per-group term map. Called
391397
// at the top of Flush so the column built below sees a stable view
392398
// of the term mapping even if SetLeaderTerm fires concurrently.
399+
// Returns nil when no terms have been published yet — `nil[k]` returns
400+
// the zero value in Go, so the row-builder hot path needs no nil guard
401+
// and avoids one allocation per Flush before SetLeaderTerm is wired.
393402
func (s *MemSampler) snapshotGroupTerms() map[uint64]uint64 {
394403
s.groupTermsMu.RLock()
395404
defer s.groupTermsMu.RUnlock()
405+
if len(s.groupTerms) == 0 {
406+
return nil
407+
}
396408
out := make(map[uint64]uint64, len(s.groupTerms))
397409
for g, t := range s.groupTerms {
398410
out[g] = t

keyviz/sampler_test.go

Lines changed: 103 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -843,7 +843,6 @@ func TestReRegisterDuringPruneGraceCancelsPrune(t *testing.T) {
843843
}
844844
}
845845

846-
// TestNonPositiveOptionsFallBackToDefaults pins Codex round-8 P2: a
847846
// TestRegisterRouteCarriesGroupID pins the Phase 2-C+ contract that
848847
// MatrixRow.RaftGroupID echoes the groupID supplied at RegisterRoute,
849848
// independent of leader-term tracking. A route registered with
@@ -908,6 +907,109 @@ func TestSetLeaderTermNilSafe(t *testing.T) {
908907
s.SetLeaderTerm(1, 42) // must not panic
909908
}
910909

910+
// TestSetLeaderTermZeroGroupIDDoesNotPolluteVirtualBucket pins the
911+
// invariant that virtual aggregate buckets (which span multiple real
912+
// groups and stamp RaftGroupID=0) must always emit LeaderTerm=0.
913+
// Without the SetLeaderTerm groupID==0 guard, a caller that mistakenly
914+
// publishes a term for groupID=0 would cause appendDrainedRow's
915+
// `terms[groupID]` lookup to stamp the virtual bucket with that
916+
// non-zero term, breaking the fan-out merge's max-merge fallback for
917+
// cross-group cells.
918+
func TestSetLeaderTermZeroGroupIDDoesNotPolluteVirtualBucket(t *testing.T) {
919+
t.Parallel()
920+
s, _ := setupOneIndividualPlusVirtualBucket(t)
921+
s.SetLeaderTerm(0, 999) // bug case: caller passes the reserved groupID
922+
s.Observe(2, OpWrite, 16, 64)
923+
s.Flush()
924+
cols := s.Snapshot(time.Time{}, time.Time{})
925+
if len(cols) == 0 {
926+
t.Fatalf("expected at least one column")
927+
}
928+
var virtualRow *MatrixRow
929+
for i := range cols[0].Rows {
930+
if cols[0].Rows[i].Aggregate {
931+
virtualRow = &cols[0].Rows[i]
932+
break
933+
}
934+
}
935+
if virtualRow == nil {
936+
t.Fatalf("expected an aggregate row in column 0; rows=%+v", cols[0].Rows)
937+
}
938+
if virtualRow.RaftGroupID != 0 {
939+
t.Errorf("virtual bucket RaftGroupID = %d, want 0", virtualRow.RaftGroupID)
940+
}
941+
if virtualRow.LeaderTerm != 0 {
942+
t.Errorf("virtual bucket LeaderTerm = %d, want 0 (groupID=0 is reserved for aggregate buckets)", virtualRow.LeaderTerm)
943+
}
944+
}
945+
946+
// TestVirtualBucketRaftGroupIDIsZero pins that an over-budget route
947+
// folded into a virtual aggregate bucket emits RaftGroupID=0 even
948+
// when the member routes themselves were registered with non-zero
949+
// groupIDs. The fan-out aggregator relies on this to fall back to
950+
// max-merge for aggregate rows; without it, propagating a member's
951+
// groupID into the bucket would silently re-engage per-term dedupe
952+
// on rows that span multiple Raft groups.
953+
func TestVirtualBucketRaftGroupIDIsZero(t *testing.T) {
954+
t.Parallel()
955+
s, _ := newTestSampler(t, MemSamplerOptions{
956+
Step: time.Second,
957+
HistoryColumns: 4,
958+
MaxTrackedRoutes: 1,
959+
})
960+
if !s.RegisterRoute(1, []byte("a"), []byte("b"), 42) {
961+
t.Fatal("route 1 should fit (group 42)")
962+
}
963+
if s.RegisterRoute(2, []byte("c"), []byte("d"), 42) {
964+
t.Fatal("route 2 should fold into virtual bucket (group 42, over budget)")
965+
}
966+
s.Observe(2, OpWrite, 16, 64)
967+
s.Flush()
968+
cols := s.Snapshot(time.Time{}, time.Time{})
969+
if len(cols) == 0 {
970+
t.Fatalf("expected at least one column")
971+
}
972+
var virtualRow *MatrixRow
973+
for i := range cols[0].Rows {
974+
if cols[0].Rows[i].Aggregate {
975+
virtualRow = &cols[0].Rows[i]
976+
break
977+
}
978+
}
979+
if virtualRow == nil {
980+
t.Fatalf("expected an aggregate row in column 0; rows=%+v", cols[0].Rows)
981+
}
982+
if virtualRow.RaftGroupID != 0 {
983+
t.Errorf("virtual bucket RaftGroupID = %d, want 0 (member groupID=42 must not propagate)", virtualRow.RaftGroupID)
984+
}
985+
}
986+
987+
// TestRegisterRouteSecondCallIgnoresGroupID pins the live-slot
988+
// idempotency contract documented on RegisterRoute: when the slot is
989+
// already live, a second RegisterRoute call returns true without
990+
// updating the slot's metadata, including GroupID. Callers that need
991+
// to change a live slot's GroupID must RemoveRoute + RegisterRoute,
992+
// not call RegisterRoute again with a different groupID.
993+
func TestRegisterRouteSecondCallIgnoresGroupID(t *testing.T) {
994+
t.Parallel()
995+
s, _ := newTestSampler(t, MemSamplerOptions{Step: time.Second, HistoryColumns: 4})
996+
if !s.RegisterRoute(1, []byte("a"), []byte("b"), 10) {
997+
t.Fatal("first RegisterRoute returned false")
998+
}
999+
if !s.RegisterRoute(1, []byte("a"), []byte("b"), 99) {
1000+
t.Fatal("second RegisterRoute should be a no-op returning true")
1001+
}
1002+
s.Observe(1, OpWrite, 16, 64)
1003+
s.Flush()
1004+
cols := s.Snapshot(time.Time{}, time.Time{})
1005+
if len(cols) != 1 || len(cols[0].Rows) != 1 {
1006+
t.Fatalf("unexpected snapshot: %+v", cols)
1007+
}
1008+
if got := cols[0].Rows[0].RaftGroupID; got != 10 {
1009+
t.Errorf("RaftGroupID = %d, want 10 (second call's groupID=99 must be ignored)", got)
1010+
}
1011+
}
1012+
9111013
// negative MaxTrackedRoutes used to bypass the zero-check and force
9121014
// every route into a virtual bucket. Confirm both zero and negative
9131015
// inputs land on the documented defaults so a bad CLI/env value

0 commit comments

Comments
 (0)