Skip to content

Commit 4c60906

Browse files
committed
keyviz: reuse retired slot on re-registration + test polish
Codex round-12 P2: Re-registering a removed RouteID inside the grace window allocated a brand-new *routeSlot. Flush then drained both the new live slot AND the old retired slot in the same column, producing two rows with the same RouteID and splitting counts between them. Add reclaimRetiredSlot — when RegisterRoute lands a real (non-aggregate) routeID and a retired slot with that ID is queued, pull it off retiredSlots and reuse it. The old counters and any in-flight late-Observe writes converge with new traffic on the same slot, and Flush emits a single row. TestReRegisterIndividualReusesRetiredSlot asserts one row per RouteID per column with correctly accumulated counts. Claude bot nits: - nextVirtualBucketID comment now spells out the atomic.Uint64 wrap-around explicitly instead of the cryptic "MaxUint64+1" form. - TestRejoinAsIndividualLetsBucketPruneFire was vacuously true — the bucket had no traffic so it never emitted a row, making the prune assertion unreachable. Restructure with two virtual members (so removing one leaves the bucket alive, not orphaned), drive traffic through the surviving member, and flush twice past grace so the post-prune MemberRoutes lands in a snapshot row.
1 parent 25061d6 commit 4c60906

2 files changed

Lines changed: 112 additions & 21 deletions

File tree

keyviz/sampler.go

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -376,10 +376,25 @@ func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte) bool {
376376

377377
next := copyRouteTable(cur)
378378
if len(next.slots) < s.opts.MaxTrackedRoutes {
379-
slot := &routeSlot{
380-
RouteID: routeID,
381-
Start: cloneBytes(start),
382-
End: cloneBytes(end),
379+
slot := s.reclaimRetiredSlot(routeID)
380+
if slot == nil {
381+
slot = &routeSlot{
382+
RouteID: routeID,
383+
Start: cloneBytes(start),
384+
End: cloneBytes(end),
385+
}
386+
} else {
387+
// Re-registering the same routeID inside the grace window:
388+
// reuse the retired slot so any in-flight Observe writers
389+
// hitting the prior table snapshot land on the same slot
390+
// the new traffic uses, and Flush emits a single row per
391+
// RouteID instead of two (one live + one retired) with
392+
// counts split across them. Refresh the metadata fields to
393+
// match the new registration; counters are preserved.
394+
slot.metaMu.Lock()
395+
slot.Start = cloneBytes(start)
396+
slot.End = cloneBytes(end)
397+
slot.metaMu.Unlock()
383398
}
384399
next.slots[routeID] = slot
385400
next.sortedSlots = appendSorted(next.sortedSlots, slot)
@@ -613,7 +628,36 @@ func (s *MemSampler) cancelPendingPruneFor(bucket *routeSlot, routeID uint64) {
613628
// aggregate, one individual — if the original route is later
614629
// re-registered as an individual slot.
615630
func (s *MemSampler) nextVirtualBucketID() uint64 {
616-
return s.virtualIDCounter.Add(^uint64(0)) // subtract 1; counter starts at MaxUint64+1
631+
// atomic.Uint64 starts at 0; adding ^uint64(0) wraps to MaxUint64
632+
// on the first call, MaxUint64-1 on the second, etc.
633+
return s.virtualIDCounter.Add(^uint64(0))
634+
}
635+
636+
// reclaimRetiredSlot looks for a non-aggregate retired slot whose
637+
// RouteID matches the supplied routeID and, if found, removes it
638+
// from retiredSlots and returns it for reuse. This guarantees a
639+
// route removed and re-registered inside the grace window is
640+
// represented by a single *routeSlot — Flush would otherwise emit
641+
// two rows with the same RouteID (one from the new live slot, one
642+
// from the still-draining retired slot) and split counts across
643+
// them. Aggregate (orphaned virtual bucket) entries are left in
644+
// place because their RouteID lives in the synthetic namespace and
645+
// a real-ID match against an aggregate would be coincidental.
646+
func (s *MemSampler) reclaimRetiredSlot(routeID uint64) *routeSlot {
647+
s.retiredMu.Lock()
648+
defer s.retiredMu.Unlock()
649+
var reclaimed *routeSlot
650+
keep := s.retiredSlots[:0]
651+
for _, r := range s.retiredSlots {
652+
if reclaimed == nil && !r.slot.Aggregate && r.slot.RouteID == routeID {
653+
reclaimed = r.slot
654+
continue
655+
}
656+
keep = append(keep, r)
657+
}
658+
clearTail(s.retiredSlots, len(keep))
659+
s.retiredSlots = keep
660+
return reclaimed
617661
}
618662

619663
// memberRoutesContains reports whether routeID is already listed in

keyviz/sampler_test.go

Lines changed: 63 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -718,41 +718,88 @@ func TestVirtualBucketRouteIDIsSynthetic(t *testing.T) {
718718
}
719719

720720
// TestRejoinAsIndividualLetsBucketPruneFire pins Codex round-11 P2:
721-
// when a removed virtual member rejoins as an individual slot (capacity
722-
// freed up), the deferred member-prune for the old bucket must still
723-
// execute — otherwise the bucket's MemberRoutes keeps a route that no
724-
// longer contributes traffic to it.
721+
// when a removed virtual member rejoins as an individual slot, the
722+
// deferred member-prune for the old bucket must still execute —
723+
// otherwise the bucket's MemberRoutes keeps a route that no longer
724+
// contributes traffic to it. Set up the bucket with two members so
725+
// removing one leaves the bucket alive (not orphaned), and traffic
726+
// continues to be drained across the grace boundary; the post-grace
727+
// row must show the prune actually happened.
725728
func TestRejoinAsIndividualLetsBucketPruneFire(t *testing.T) {
726729
t.Parallel()
727730
s, clk := newTestSampler(t, MemSamplerOptions{
728731
Step: time.Second,
729-
HistoryColumns: 4,
730-
MaxTrackedRoutes: 2,
732+
HistoryColumns: 8,
733+
MaxTrackedRoutes: 1,
731734
})
732735
mustRegister(t, s, 1, "a", "b")
733-
mustRegister(t, s, 2, "m", "n")
736+
if s.RegisterRoute(2, []byte("m"), []byte("n")) {
737+
t.Fatal("route 2 should fold")
738+
}
734739
if s.RegisterRoute(3, []byte("y"), []byte("z")) {
735-
t.Fatal("route 3 over budget should fold")
740+
t.Fatal("route 3 should fold (same bucket)")
736741
}
737-
// Free capacity: remove route 2 (now slots have room) AND remove 3
738-
// (queued in pendingPrunes). Re-register 3 — it now fits as an
739-
// individual slot, so the old bucket should still get pruned.
742+
s.Observe(2, OpRead, 0, 0)
743+
s.Observe(3, OpRead, 0, 0)
744+
// Free capacity (route 1) and remove the virtual member (route 3),
745+
// which queues a prune against the bucket. Route 2 still keeps the
746+
// bucket alive in virtualForRoute.
747+
s.RemoveRoute(1)
740748
s.RemoveRoute(3)
741-
s.RemoveRoute(2)
742749
if !s.RegisterRoute(3, []byte("y"), []byte("z")) {
743-
t.Fatal("route 3 should now fit as individual slot")
750+
t.Fatal("route 3 should fit individually now")
744751
}
745752

753+
// Two flushes after grace: the first executes the prune, the
754+
// second emits a row with the post-prune MemberRoutes.
746755
clk.Advance(s.graceWindow() + time.Second)
747-
s.Observe(3, OpRead, 0, 0)
756+
s.Observe(2, OpRead, 0, 0)
757+
s.Flush()
758+
s.Observe(2, OpRead, 0, 0)
748759
s.Flush()
749760

750761
rows := lastSnapshotColumn(t, s).Rows
762+
agg := findAggregateRow(t, rows)
763+
if memberRoutesContain(agg.MemberRoutes, 3) {
764+
t.Fatalf("bucket still lists pruned route 3: %v", agg.MemberRoutes)
765+
}
766+
if !memberRoutesContain(agg.MemberRoutes, 2) {
767+
t.Fatalf("bucket dropped still-active route 2: %v", agg.MemberRoutes)
768+
}
769+
}
770+
771+
// TestReRegisterIndividualReusesRetiredSlot pins Codex round-12 P2:
772+
// re-registering the same RouteID inside the grace window must reuse
773+
// the retired slot. Otherwise Flush emits two rows for the same
774+
// RouteID in one column (live + retired drain), splitting counts.
775+
func TestReRegisterIndividualReusesRetiredSlot(t *testing.T) {
776+
t.Parallel()
777+
s, _ := newTestSampler(t, MemSamplerOptions{
778+
Step: time.Second,
779+
HistoryColumns: 4,
780+
})
781+
mustRegister(t, s, 1, "a", "b")
782+
s.Observe(1, OpRead, 0, 0)
783+
s.RemoveRoute(1)
784+
mustRegister(t, s, 1, "a", "b")
785+
s.Observe(1, OpRead, 0, 0)
786+
s.Flush()
787+
788+
rows := lastSnapshotColumn(t, s).Rows
789+
count := 0
790+
var total uint64
751791
for _, r := range rows {
752-
if r.Aggregate && memberRoutesContain(r.MemberRoutes, 3) {
753-
t.Fatalf("after rejoin-as-individual + grace, bucket still lists route 3: %+v", r)
792+
if r.RouteID == 1 {
793+
count++
794+
total += r.Reads
754795
}
755796
}
797+
if count != 1 {
798+
t.Fatalf("got %d rows for RouteID 1 in one column; expected 1 — retired slot was not reclaimed", count)
799+
}
800+
if total != 2 {
801+
t.Fatalf("total Reads for RouteID 1 = %d, want 2 (1 pre-remove + 1 post-rejoin)", total)
802+
}
756803
}
757804

758805
// TestReRegisterDuringPruneGraceCancelsPrune pins Codex round-9 P2:

0 commit comments

Comments
 (0)