Skip to content

Commit e4b11af

Browse files
committed
keyviz: track hidden bucket members for accurate route_count
Round-2 review fixes for PR #646 (Codex P2 ×2): P2 #1: foldIntoBucket dedup only consulted the visible MemberRoutes slice, so re-registering a past-cap (hidden) routeID inside the prune grace incremented MemberRoutesTotal a second time — route_count drifted upward on remove/re-register churn even though the bucket membership didn't grow. P2 #2: pruneMemberRoute only decremented MemberRoutesTotal for routes visible in MemberRoutes. Past-cap members were absent from that list, so removals never reduced the total — route_count stayed permanently inflated and route_ids_truncated stayed permanently true even after the route was fully gone. Add a `hiddenMembers map[uint64]struct{}` set on routeSlot. Past-cap folds land there instead of being silently dropped from accounting: - foldIntoBucket dedups against both MemberRoutes and hiddenMembers before incrementing. - pruneMemberRoute checks hiddenMembers when the routeID isn't in the visible list, deletes from the set (releasing the map when it empties), and decrements the total. Refactor: extract addMemberToBucket helper so foldIntoBucket stays under the cyclop budget. Tests: - TestPastCapMemberRejoinDoesNotInflateTotal — Remove + Register a hidden member → MemberRoutesTotal stays the same, no drift. - TestPastCapMemberPruneDecrementsTotal — Remove + grace expiry of a hidden member → MemberRoutesTotal decrements after the prune fires on the second post-grace flush. - Updated TestMemberRoutesCappedAtConfiguredCap to assert MemberRoutesTotal=8 alongside the existing visible-cap=3 check.
1 parent 237a090 commit e4b11af

2 files changed

Lines changed: 143 additions & 19 deletions

File tree

keyviz/sampler.go

Lines changed: 55 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -226,11 +226,16 @@ type routeSlot struct {
226226
// routes together (Snapshot surfaces this in MatrixRow).
227227
Aggregate bool
228228
MemberRoutes []uint64
229-
// MemberRoutesTotal counts every distinct routeID that has folded
230-
// into this bucket, including ones beyond MaxMemberRoutesPerSlot
231-
// (which still contribute to the counters but are not appended to
232-
// MemberRoutes). Always equals len(MemberRoutes) for individual
233-
// (non-Aggregate) slots.
229+
// hiddenMembers stores routeIDs folded past
230+
// MaxMemberRoutesPerSlot. Used to dedup re-folds (so
231+
// MemberRoutesTotal doesn't drift on remove+re-register churn for
232+
// past-cap routes) and to drive accurate decrements in
233+
// pruneMemberRoute. nil for individual slots.
234+
hiddenMembers map[uint64]struct{}
235+
// MemberRoutesTotal is len(MemberRoutes) + len(hiddenMembers) — the
236+
// authoritative count of distinct routes folded into this bucket.
237+
// Always equals len(MemberRoutes) for individual (non-Aggregate)
238+
// slots.
234239
MemberRoutesTotal uint64
235240

236241
reads atomic.Uint64
@@ -455,16 +460,12 @@ func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte) bool {
455460
// to preserve Flush's key-order contract.
456461
//
457462
// MemberRoutes growth is capped by MaxMemberRoutesPerSlot — beyond
458-
// that cap the bucket counters still absorb the route's traffic, but
459-
// the routeID is not added to the visible member list.
463+
// that cap the bucket counters still absorb the route's traffic and
464+
// the routeID is recorded in hiddenMembers (so dedup is correct on
465+
// rejoin) but not appended to the visible MemberRoutes list.
460466
func (s *MemSampler) foldIntoBucket(next *routeTable, bucket *routeSlot, routeID uint64, start, end []byte) {
461467
bucket.metaMu.Lock()
462-
if !memberRoutesContains(bucket.MemberRoutes, routeID) {
463-
bucket.MemberRoutesTotal++
464-
if len(bucket.MemberRoutes) < s.opts.MaxMemberRoutesPerSlot {
465-
bucket.MemberRoutes = append(bucket.MemberRoutes, routeID)
466-
}
467-
}
468+
addMemberToBucket(bucket, routeID, s.opts.MaxMemberRoutesPerSlot)
468469
if len(end) == 0 || (len(bucket.End) != 0 && bytesGT(end, bucket.End)) {
469470
bucket.End = cloneBytes(end)
470471
}
@@ -478,6 +479,31 @@ func (s *MemSampler) foldIntoBucket(next *routeTable, bucket *routeSlot, routeID
478479
}
479480
}
480481

482+
// addMemberToBucket records routeID against bucket, choosing the
483+
// visible MemberRoutes list when there's room and falling back to
484+
// the hiddenMembers set past the cap. Both lists are deduped so a
485+
// rejoin during the prune grace doesn't inflate MemberRoutesTotal.
486+
// Caller holds bucket.metaMu.
487+
func addMemberToBucket(bucket *routeSlot, routeID uint64, visibleCap int) {
488+
if memberRoutesContains(bucket.MemberRoutes, routeID) {
489+
return
490+
}
491+
if bucket.hiddenMembers != nil {
492+
if _, ok := bucket.hiddenMembers[routeID]; ok {
493+
return
494+
}
495+
}
496+
if len(bucket.MemberRoutes) < visibleCap {
497+
bucket.MemberRoutes = append(bucket.MemberRoutes, routeID)
498+
} else {
499+
if bucket.hiddenMembers == nil {
500+
bucket.hiddenMembers = make(map[uint64]struct{})
501+
}
502+
bucket.hiddenMembers[routeID] = struct{}{}
503+
}
504+
bucket.MemberRoutesTotal++
505+
}
506+
481507
// RemoveRoute drops a RouteID from tracking. Counts accumulated since
482508
// the last flush are NOT lost: the retired slot (or, for virtual-bucket
483509
// members, just the membership entry) is queued for one final drain by
@@ -719,12 +745,13 @@ func bucketStillReferenced(virtualForRoute map[uint64]*routeSlot, bucket *routeS
719745
return false
720746
}
721747

722-
// pruneMemberRoute removes routeID from bucket.MemberRoutes under the
723-
// bucket's metaMu so a concurrent snapshotMeta reader sees a
724-
// consistent view. MemberRoutesTotal is decremented when the routeID
725-
// was visible in MemberRoutes (the only case we can confidently
726-
// account for) — routes pruned past the visible cap stay in the
727-
// total because we don't track individual past-cap members.
748+
// pruneMemberRoute removes routeID from bucket.MemberRoutes (or
749+
// hiddenMembers) under the bucket's metaMu so a concurrent
750+
// snapshotMeta reader sees a consistent view. MemberRoutesTotal is
751+
// decremented whenever the routeID was actually present in either
752+
// list — including past-cap members in hiddenMembers — so the
753+
// reported route_count stays truthful across remove/re-register
754+
// churn.
728755
func pruneMemberRoute(bucket *routeSlot, routeID uint64) {
729756
bucket.metaMu.Lock()
730757
defer bucket.metaMu.Unlock()
@@ -738,6 +765,15 @@ func pruneMemberRoute(bucket *routeSlot, routeID uint64) {
738765
filtered = append(filtered, m)
739766
}
740767
bucket.MemberRoutes = filtered
768+
if !removed && bucket.hiddenMembers != nil {
769+
if _, ok := bucket.hiddenMembers[routeID]; ok {
770+
delete(bucket.hiddenMembers, routeID)
771+
if len(bucket.hiddenMembers) == 0 {
772+
bucket.hiddenMembers = nil
773+
}
774+
removed = true
775+
}
776+
}
741777
if removed && bucket.MemberRoutesTotal > 0 {
742778
bucket.MemberRoutesTotal--
743779
}

keyviz/sampler_test.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -916,6 +916,94 @@ func TestMemberRoutesCappedAtConfiguredCap(t *testing.T) {
916916
if agg.Reads != 8 {
917917
t.Fatalf("bucket Reads = %d, want 8 (counters must absorb traffic past cap)", agg.Reads)
918918
}
919+
if agg.MemberRoutesTotal != 8 {
920+
t.Fatalf("MemberRoutesTotal = %d, want 8 (true count of contributors past cap)", agg.MemberRoutesTotal)
921+
}
922+
}
923+
924+
// TestPastCapMemberRejoinDoesNotInflateTotal pins Codex round-2 P2
925+
// on PR #646: re-registering a hidden (past-cap) routeID inside the
926+
// prune grace must not double-count MemberRoutesTotal. The hidden
927+
// member set serves as dedup so route_count stays truthful across
928+
// remove/re-register churn for routes the visible MemberRoutes list
929+
// never carried.
930+
func TestPastCapMemberRejoinDoesNotInflateTotal(t *testing.T) {
931+
t.Parallel()
932+
s, _ := newTestSampler(t, MemSamplerOptions{
933+
Step: time.Second,
934+
HistoryColumns: 4,
935+
MaxTrackedRoutes: 1,
936+
MaxMemberRoutesPerSlot: 1, // visible cap=1; routes 3+4 land in hidden set
937+
})
938+
mustRegister(t, s, 1, "a", "b")
939+
if s.RegisterRoute(2, []byte("c"), []byte("d")) {
940+
t.Fatal("route 2 should fold (visible)")
941+
}
942+
if s.RegisterRoute(3, []byte("e"), []byte("f")) {
943+
t.Fatal("route 3 should fold (hidden — past cap)")
944+
}
945+
if s.RegisterRoute(4, []byte("g"), []byte("h")) {
946+
t.Fatal("route 4 should fold (hidden — past cap)")
947+
}
948+
s.Observe(2, OpRead, 0, 0)
949+
s.Flush()
950+
beforeTotal := findAggregateRow(t, lastSnapshotColumn(t, s).Rows).MemberRoutesTotal
951+
if beforeTotal != 3 {
952+
t.Fatalf("baseline MemberRoutesTotal = %d, want 3", beforeTotal)
953+
}
954+
// Remove the past-cap route 3 and re-register it inside the prune
955+
// grace. Without the hidden-member dedup foldIntoBucket would
956+
// increment MemberRoutesTotal again, drifting route_count up.
957+
s.RemoveRoute(3)
958+
if s.RegisterRoute(3, []byte("e"), []byte("f")) {
959+
t.Fatal("route 3 should fold again")
960+
}
961+
s.Observe(2, OpRead, 0, 0)
962+
s.Flush()
963+
afterTotal := findAggregateRow(t, lastSnapshotColumn(t, s).Rows).MemberRoutesTotal
964+
if afterTotal != 3 {
965+
t.Fatalf("after remove+re-register, MemberRoutesTotal = %d, want 3 (no drift)", afterTotal)
966+
}
967+
}
968+
969+
// TestPastCapMemberPruneDecrementsTotal pins the other half of Codex
970+
// round-2 P2: when grace expires for a hidden (past-cap) member, the
971+
// prune must decrement MemberRoutesTotal — otherwise route_count
972+
// stays inflated even after the route is fully retired.
973+
func TestPastCapMemberPruneDecrementsTotal(t *testing.T) {
974+
t.Parallel()
975+
s, clk := newTestSampler(t, MemSamplerOptions{
976+
Step: time.Second,
977+
HistoryColumns: 8,
978+
MaxTrackedRoutes: 1,
979+
MaxMemberRoutesPerSlot: 1,
980+
})
981+
mustRegister(t, s, 1, "a", "b")
982+
if s.RegisterRoute(2, []byte("c"), []byte("d")) {
983+
t.Fatal("route 2 should fold (visible)")
984+
}
985+
if s.RegisterRoute(3, []byte("e"), []byte("f")) {
986+
t.Fatal("route 3 should fold (hidden — past cap)")
987+
}
988+
s.Observe(2, OpRead, 0, 0)
989+
s.Flush()
990+
require := func(want uint64, ctx string) {
991+
t.Helper()
992+
s.Observe(2, OpRead, 0, 0)
993+
s.Flush()
994+
got := findAggregateRow(t, lastSnapshotColumn(t, s).Rows).MemberRoutesTotal
995+
if got != want {
996+
t.Fatalf("%s: MemberRoutesTotal = %d, want %d", ctx, got, want)
997+
}
998+
}
999+
require(2, "baseline")
1000+
s.RemoveRoute(3) // hidden member — pendingPrune queued
1001+
clk.Advance(s.graceWindow() + time.Second)
1002+
// First flush past grace: drains the bucket (snapshot still
1003+
// shows Total=2) THEN fires the pending prune. The next flush is
1004+
// the first one whose drain reflects the post-prune state.
1005+
require(2, "drain runs before prune fires")
1006+
require(1, "after past-cap prune fires")
9191007
}
9201008

9211009
// TestRetiredTailClearedAfterDrop pins Codex round-7 P2: after a

0 commit comments

Comments
 (0)