Skip to content

Commit f9f7d59

Browse files
committed
keyviz: cap bucket MemberRoutes + clear retired tails
Codex round-7 P2 #1: foldIntoBucket appended every over-budget route ID to bucket.MemberRoutes with no bound. Per-column payload size scaled with total folded routes — defeating the MaxTrackedRoutes cap when total routes greatly exceed it. Add MaxMemberRoutesPerSlot (default 256, configurable via MemSamplerOptions). Past the cap the bucket counters still absorb the route's traffic, but the routeID is not appended. TestMemberRoutesCappedAtConfiguredCap pins both halves. Codex round-7 P2 #2: drainRetiredSlots and advancePendingPrunes compacted via keep := s[:0]; the dropped tail of the backing array retained *routeSlot / *bucket pointers, keeping released slots GC-reachable through the reused capacity. Add clearTail / clearPruneTail helpers that zero the dropped tail before returning. Regression test TestRetiredTailClearedAfterDrop holds a header into the original backing array and verifies index 0 is nilled after the drop.
1 parent 1b15db2 commit f9f7d59

2 files changed

Lines changed: 114 additions & 8 deletions

File tree

keyviz/sampler.go

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,10 @@ type Sampler interface {
6767

6868
// Defaults for MemSamplerOptions when fields are left zero.
6969
const (
70-
DefaultStep = 60 * time.Second
71-
DefaultHistoryColumns = 1440 // 24 hours at 60s steps.
72-
DefaultMaxTrackedRoutes = 10_000
70+
DefaultStep = 60 * time.Second
71+
DefaultHistoryColumns = 1440 // 24 hours at 60s steps.
72+
DefaultMaxTrackedRoutes = 10_000
73+
DefaultMaxMemberRoutesPerSlot = 256
7374
)
7475

7576
// MemSamplerOptions configures NewMemSampler. Zero values fall back to
@@ -86,6 +87,14 @@ type MemSamplerOptions struct {
8687
// returns false past this cap, the route ID maps into a virtual
8788
// bucket, and Snapshot reports it with Aggregate=true.
8889
MaxTrackedRoutes int
90+
// MaxMemberRoutesPerSlot caps how many distinct RouteIDs a single
91+
// virtual bucket records in MemberRoutes. Beyond this cap the
92+
// route still folds into the bucket counters (so traffic is not
93+
// dropped) but the routeID is not appended — keeping per-column
94+
// payload size bounded when total routes far exceed
95+
// MaxTrackedRoutes. Snapshot consumers should treat the list as
96+
// "first N members" rather than authoritative attribution.
97+
MaxMemberRoutesPerSlot int
8998
// Now overrides time.Now for tests; nil falls back to time.Now.
9099
Now func() time.Time
91100
}
@@ -267,6 +276,9 @@ func NewMemSampler(opts MemSamplerOptions) *MemSampler {
267276
if opts.MaxTrackedRoutes == 0 {
268277
opts.MaxTrackedRoutes = DefaultMaxTrackedRoutes
269278
}
279+
if opts.MaxMemberRoutesPerSlot <= 0 {
280+
opts.MaxMemberRoutesPerSlot = DefaultMaxMemberRoutesPerSlot
281+
}
270282
now := opts.Now
271283
if now == nil {
272284
now = time.Now
@@ -371,7 +383,7 @@ func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte) bool {
371383
}
372384
next.sortedSlots = appendSorted(next.sortedSlots, bucket)
373385
} else {
374-
foldIntoBucket(next, bucket, routeID, start, end)
386+
s.foldIntoBucket(next, bucket, routeID, start, end)
375387
}
376388
next.virtualForRoute[routeID] = bucket
377389
s.table.Store(next)
@@ -385,9 +397,15 @@ func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte) bool {
385397
// Counters live next to the metadata but are protected by their own
386398
// atomic ops, not metaMu. If Start is lowered, sortedSlots is rebuilt
387399
// to preserve Flush's key-order contract.
388-
func foldIntoBucket(next *routeTable, bucket *routeSlot, routeID uint64, start, end []byte) {
400+
//
401+
// MemberRoutes growth is capped by MaxMemberRoutesPerSlot — beyond
402+
// that cap the bucket counters still absorb the route's traffic, but
403+
// the routeID is not added to the visible member list.
404+
func (s *MemSampler) foldIntoBucket(next *routeTable, bucket *routeSlot, routeID uint64, start, end []byte) {
389405
bucket.metaMu.Lock()
390-
bucket.MemberRoutes = append(bucket.MemberRoutes, routeID)
406+
if len(bucket.MemberRoutes) < s.opts.MaxMemberRoutesPerSlot {
407+
bucket.MemberRoutes = append(bucket.MemberRoutes, routeID)
408+
}
391409
if len(end) == 0 || (len(bucket.End) != 0 && bytesGT(end, bucket.End)) {
392410
bucket.End = cloneBytes(end)
393411
}
@@ -502,7 +520,9 @@ func (s *MemSampler) Flush() {
502520
// entries whose grace window has not yet elapsed. Rows are appended
503521
// to *rows so the caller sees the slice growth. Entries whose
504522
// elapsed time (now - retiredAt) has reached grace are dropped after
505-
// this final drain.
523+
// this final drain. The dropped tail of the backing array is zeroed
524+
// so released *routeSlot pointers do not stay GC-reachable through
525+
// the reused capacity.
506526
func drainRetiredSlots(retired []retiredSlot, rows *[]MatrixRow, now time.Time, grace time.Duration) []retiredSlot {
507527
keep := retired[:0]
508528
for _, r := range retired {
@@ -511,13 +531,15 @@ func drainRetiredSlots(retired []retiredSlot, rows *[]MatrixRow, now time.Time,
511531
keep = append(keep, r)
512532
}
513533
}
534+
clearTail(retired, len(keep))
514535
return keep
515536
}
516537

517538
// advancePendingPrunes lets each pending member-prune live until its
518539
// retiredAt+grace passes, then actually prunes the routeID from the
519540
// bucket's MemberRoutes. Returns the entries still inside the grace
520-
// window.
541+
// window. Like drainRetiredSlots, the dropped tail is zeroed so
542+
// released bucket pointers don't linger via the reused capacity.
521543
func advancePendingPrunes(pending []memberPrune, now time.Time, grace time.Duration) []memberPrune {
522544
keep := pending[:0]
523545
for _, p := range pending {
@@ -527,9 +549,26 @@ func advancePendingPrunes(pending []memberPrune, now time.Time, grace time.Durat
527549
}
528550
pruneMemberRoute(p.bucket, p.routeID)
529551
}
552+
clearPruneTail(pending, len(keep))
530553
return keep
531554
}
532555

556+
// clearTail zeroes the [keepLen, len(s)) range of s so dropped entries
557+
// don't keep their *routeSlot pointers GC-reachable through the
558+
// reused backing array.
559+
func clearTail(s []retiredSlot, keepLen int) {
560+
for i := keepLen; i < len(s); i++ {
561+
s[i] = retiredSlot{}
562+
}
563+
}
564+
565+
// clearPruneTail is clearTail for the pendingPrunes queue.
566+
func clearPruneTail(s []memberPrune, keepLen int) {
567+
for i := keepLen; i < len(s); i++ {
568+
s[i] = memberPrune{}
569+
}
570+
}
571+
533572
// bucketStillReferenced reports whether any RouteID in
534573
// virtualForRoute still maps to bucket. Used by RemoveRoute to detect
535574
// when a virtual bucket has lost its last member and must be retired

keyviz/sampler_test.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -687,6 +687,73 @@ func TestSnapshotReturnsDeepCopy(t *testing.T) {
687687
}
688688
}
689689

690+
// TestMemberRoutesCappedAtConfiguredCap pins Codex round-7 P2: per-
691+
// bucket MemberRoutes growth is bounded by MaxMemberRoutesPerSlot, so
692+
// flushed columns don't scale with total folded routes when the
693+
// deployment route count vastly exceeds MaxTrackedRoutes.
694+
func TestMemberRoutesCappedAtConfiguredCap(t *testing.T) {
695+
t.Parallel()
696+
s, _ := newTestSampler(t, MemSamplerOptions{
697+
Step: time.Second,
698+
HistoryColumns: 4,
699+
MaxTrackedRoutes: 1,
700+
MaxMemberRoutesPerSlot: 3,
701+
})
702+
mustRegister(t, s, 1, "a", "b")
703+
for i := uint64(2); i < 10; i++ {
704+
key := []byte{byte('a' + i)}
705+
if s.RegisterRoute(i, key, append(key, 'z')) {
706+
t.Fatalf("route %d should fold (over budget)", i)
707+
}
708+
s.Observe(i, OpRead, 1, 0)
709+
}
710+
s.Flush()
711+
rows := lastSnapshotColumn(t, s).Rows
712+
agg := findAggregateRow(t, rows)
713+
if len(agg.MemberRoutes) > 3 {
714+
t.Fatalf("MemberRoutes exceeds cap=3: %v", agg.MemberRoutes)
715+
}
716+
// All 8 over-budget routes still drove the bucket counters even
717+
// though only the first 3 are recorded as members.
718+
if agg.Reads != 8 {
719+
t.Fatalf("bucket Reads = %d, want 8 (counters must absorb traffic past cap)", agg.Reads)
720+
}
721+
}
722+
723+
// TestRetiredTailClearedAfterDrop pins Codex round-7 P2: after a
724+
// retired slot's grace expires and drainRetiredSlots drops the entry,
725+
// the *routeSlot pointer in the dropped tail of the backing array
726+
// must be zeroed so it doesn't keep the slot GC-reachable through
727+
// the reused capacity.
728+
func TestRetiredTailClearedAfterDrop(t *testing.T) {
729+
t.Parallel()
730+
s, clk := newTestSampler(t, MemSamplerOptions{Step: time.Second, HistoryColumns: 4})
731+
mustRegister(t, s, 1, "a", "b")
732+
s.Observe(1, OpRead, 1, 1)
733+
s.RemoveRoute(1)
734+
735+
s.retiredMu.Lock()
736+
if len(s.retiredSlots) != 1 {
737+
s.retiredMu.Unlock()
738+
t.Fatalf("expected 1 retired slot pre-flush, got %d", len(s.retiredSlots))
739+
}
740+
orig := s.retiredSlots[:1:1] // slice header pinning index 0 of the backing array
741+
s.retiredMu.Unlock()
742+
743+
clk.Advance(s.graceWindow() + time.Second)
744+
s.Flush()
745+
746+
s.retiredMu.Lock()
747+
leftover := len(s.retiredSlots)
748+
s.retiredMu.Unlock()
749+
if leftover != 0 {
750+
t.Fatalf("expected drain to drop entry, len=%d", leftover)
751+
}
752+
if orig[0].slot != nil {
753+
t.Fatal("dropped retiredSlot.slot pointer not zeroed in backing array — GC retention leak")
754+
}
755+
}
756+
690757
// BenchmarkObserveHit pins the hot-path properties claimed in the
691758
// package doc: a single atomic.Pointer.Load, a map lookup, and at
692759
// most two atomic.AddUint64 calls — no allocation, no mutex. Use

0 commit comments

Comments
 (0)