Skip to content

Commit 1b15db2

Browse files
committed
keyviz: tie late-writer grace to wall clock + benchmarks
Codex round-6 P2: the late-writer grace was driven by a flush count (retainedFlushes=2). At small Step values (millisecond scale) that window can expire before a preempted Observe goroutine resumes, silently dropping its atomic.Add into a slot that has just been freed from retiredSlots. Switch to a wall-clock grace: each retiredSlot and memberPrune carries its retiredAt timestamp; Flush keeps the entry while now-retiredAt is below graceWindow = max(2*Step, 5s). The minGraceWindow floor protects sub-second Step configurations. Claude round-6 nits: - HistoryColumns <= 0 (not just == 0) now falls back to the default, so a negative value doesnt silently land at cap=1. - findVirtualBucket doc now describes the actual fallback (first aggregate in key order) instead of "closest to the right of start". - Package doc no longer asserts ApplySplit/ApplyMerge as current callers — flagged as future PR. Add BenchmarkObserveHit and BenchmarkObserveMiss so the hot-path properties (no alloc, lockless) are pinned ahead of coordinator wiring. Local M1 Max: 7 ns/op hit, 3.5 ns/op miss, 0 allocs.
1 parent 6346ac8 commit 1b15db2

2 files changed

Lines changed: 130 additions & 73 deletions

File tree

keyviz/sampler.go

Lines changed: 69 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,11 @@
2222
// - Flush drains the per-route counters with atomic.SwapUint64; no
2323
// pointer retirement, so a late writer cannot race past the snapshot
2424
// and lose counts.
25-
// - Adding / removing routes (RegisterRoute, RemoveRoute, ApplySplit,
26-
// ApplyMerge) builds a fresh routeTable copy under a non-hot-path
27-
// mutex and publishes it with a single atomic.Pointer.Store. Routes
28-
// mutated mid-step keep their counters in the new table by design.
25+
// - Adding / removing routes (RegisterRoute, RemoveRoute today;
26+
// ApplySplit / ApplyMerge in a future PR) builds a fresh
27+
// routeTable copy under a non-hot-path mutex and publishes it
28+
// with a single atomic.Pointer.Store. Routes mutated mid-step
29+
// keep their counters in the new table by design.
2930
package keyviz
3031

3132
import (
@@ -127,30 +128,49 @@ type MemSampler struct {
127128
}
128129

129130
// retiredSlot tracks a removed slot through its post-removal grace
130-
// period. remaining counts down to zero across successive Flushes.
131+
// period. retiredAt is the wall-clock instant of removal; the entry
132+
// is drained on every Flush whose `now()` is within the configured
133+
// grace window of retiredAt, then dropped.
131134
type retiredSlot struct {
132135
slot *routeSlot
133-
remaining int
136+
retiredAt time.Time
134137
}
135138

136139
// memberPrune tracks a deferred virtual-bucket member-route removal.
137-
// remaining counts down across Flushes; while it stays positive the
138-
// routeID is still advertised in bucket.MemberRoutes so the flushed
139-
// row's attribution matches the bucket counters that still include
140-
// pre-removal increments from this route.
140+
// retiredAt is the wall-clock instant of removal; while now() is
141+
// within the grace window, the routeID stays in bucket.MemberRoutes so
142+
// flushed rows continue attributing the bucket's mixed counters to all
143+
// members that contributed (including this one).
141144
type memberPrune struct {
142145
bucket *routeSlot
143146
routeID uint64
144-
remaining int
147+
retiredAt time.Time
145148
}
146149

147-
// retainedFlushes is how many Flush cycles a retired slot (or pending
148-
// member-prune) stays in the queue after RemoveRoute. Two cycles is
149-
// enough in practice: any goroutine still holding a pre-RemoveRoute
150-
// table snapshot will have completed its single atomic.Add long
151-
// before the second flush tick (Step is at human timescales,
152-
// microseconds suffice).
153-
const retainedFlushes = 2
150+
// minGraceWindow is the wall-clock floor for late-writer grace. It
151+
// guards against Step being configured small enough (millisecond
152+
// scale) that a flush-count-based grace window would expire before a
153+
// preempted Observe goroutine resumes. Any goroutine still holding a
154+
// pre-RemoveRoute table snapshot must finish its single atomic.Add
155+
// before now-retiredAt exceeds this window.
156+
const minGraceWindow = 5 * time.Second
157+
158+
// graceStepMultiplier is how many Step intervals we want the grace
159+
// window to cover by default — picked so retired slots are drained
160+
// at least twice before being dropped.
161+
const graceStepMultiplier = 2
162+
163+
// graceWindow is the duration retired slots and pending member-prunes
164+
// stay drainable after RemoveRoute. It's tied to wall-clock time, not
165+
// Flush count, so a small Step doesn't shrink the grace below
166+
// minGraceWindow.
167+
func (s *MemSampler) graceWindow() time.Duration {
168+
g := graceStepMultiplier * s.opts.Step
169+
if g < minGraceWindow {
170+
g = minGraceWindow
171+
}
172+
return g
173+
}
154174

155175
// routeTable is the COW snapshot Observe operates on. Once published
156176
// via MemSampler.table.Store, fields are read-only.
@@ -241,7 +261,7 @@ func NewMemSampler(opts MemSamplerOptions) *MemSampler {
241261
if opts.Step <= 0 {
242262
opts.Step = DefaultStep
243263
}
244-
if opts.HistoryColumns == 0 {
264+
if opts.HistoryColumns <= 0 {
245265
opts.HistoryColumns = DefaultHistoryColumns
246266
}
247267
if opts.MaxTrackedRoutes == 0 {
@@ -403,16 +423,17 @@ func (s *MemSampler) RemoveRoute(routeID uint64) {
403423
delete(next.slots, routeID)
404424
delete(next.virtualForRoute, routeID)
405425

426+
retiredAt := s.now()
406427
switch {
407428
case isIndividual:
408429
// Pending counters in this slot must be harvested by upcoming
409-
// Flush cycles. Drain across `retainedFlushes` cycles so an
410-
// Observe call that loaded the prior table just before this
430+
// Flush cycles. Drain across the grace window so an Observe
431+
// call that loaded the prior table just before this
411432
// atomic.Store can still complete its Add into the slot — that
412-
// in-flight write is caught by the next drain rather than
433+
// in-flight write is caught by a later drain rather than
413434
// silently lost.
414435
s.retiredMu.Lock()
415-
s.retiredSlots = append(s.retiredSlots, retiredSlot{slot: individual, remaining: retainedFlushes})
436+
s.retiredSlots = append(s.retiredSlots, retiredSlot{slot: individual, retiredAt: retiredAt})
416437
s.retiredMu.Unlock()
417438
case isVirtual:
418439
// Defer pruning until after the bucket's pre-removal counters
@@ -422,17 +443,17 @@ func (s *MemSampler) RemoveRoute(routeID uint64) {
422443
// to them — including the route we are removing.
423444
s.retiredMu.Lock()
424445
s.pendingPrunes = append(s.pendingPrunes, memberPrune{
425-
bucket: bucket, routeID: routeID, remaining: retainedFlushes,
446+
bucket: bucket, routeID: routeID, retiredAt: retiredAt,
426447
})
427448
// If this delete left the bucket with no remaining
428449
// virtualForRoute mapping, rebuildSorted will drop it from the
429450
// live sortedSlots. Queue it as a retired slot so Flush keeps
430-
// draining its counters across retainedFlushes — otherwise
451+
// draining its counters across the grace window — otherwise
431452
// pre-removal increments and any in-flight Observe writers
432453
// hitting the prior table snapshot would be silently lost.
433454
if !bucketStillReferenced(next.virtualForRoute, bucket) {
434455
s.retiredSlots = append(s.retiredSlots, retiredSlot{
435-
slot: bucket, remaining: retainedFlushes,
456+
slot: bucket, retiredAt: retiredAt,
436457
})
437458
}
438459
s.retiredMu.Unlock()
@@ -462,9 +483,10 @@ func (s *MemSampler) Flush() {
462483
col.Rows = appendDrainedRow(col.Rows, slot)
463484
}
464485

486+
grace := s.graceWindow()
465487
s.retiredMu.Lock()
466-
s.retiredSlots = drainRetiredSlots(s.retiredSlots, &col.Rows)
467-
s.pendingPrunes = advancePendingPrunes(s.pendingPrunes)
488+
s.retiredSlots = drainRetiredSlots(s.retiredSlots, &col.Rows, col.At, grace)
489+
s.pendingPrunes = advancePendingPrunes(s.pendingPrunes, col.At, grace)
468490
s.retiredMu.Unlock()
469491

470492
sort.SliceStable(col.Rows, func(i, j int) bool {
@@ -476,31 +498,30 @@ func (s *MemSampler) Flush() {
476498
s.historyMu.Unlock()
477499
}
478500

479-
// drainRetiredSlots emits a row for each retired slot, decrements the
480-
// per-entry grace counter, and returns the surviving entries (in the
481-
// original backing array). Rows are appended to *rows; the slice
482-
// header is updated through the pointer so the caller observes the
483-
// growth.
484-
func drainRetiredSlots(retired []retiredSlot, rows *[]MatrixRow) []retiredSlot {
501+
// drainRetiredSlots emits a row for each retired slot and returns the
502+
// entries whose grace window has not yet elapsed. Rows are appended
503+
// to *rows so the caller sees the slice growth. Entries whose
504+
// elapsed time (now - retiredAt) has reached grace are dropped after
505+
// this final drain.
506+
func drainRetiredSlots(retired []retiredSlot, rows *[]MatrixRow, now time.Time, grace time.Duration) []retiredSlot {
485507
keep := retired[:0]
486508
for _, r := range retired {
487509
*rows = appendDrainedRow(*rows, r.slot)
488-
r.remaining--
489-
if r.remaining > 0 {
510+
if now.Sub(r.retiredAt) < grace {
490511
keep = append(keep, r)
491512
}
492513
}
493514
return keep
494515
}
495516

496-
// advancePendingPrunes decrements each entry's grace counter; entries
497-
// whose counter reaches zero have their routeID actually pruned from
498-
// the bucket's MemberRoutes. Returns the surviving entries.
499-
func advancePendingPrunes(pending []memberPrune) []memberPrune {
517+
// advancePendingPrunes lets each pending member-prune live until its
518+
// retiredAt+grace passes, then actually prunes the routeID from the
519+
// bucket's MemberRoutes. Returns the entries still inside the grace
520+
// window.
521+
func advancePendingPrunes(pending []memberPrune, now time.Time, grace time.Duration) []memberPrune {
500522
keep := pending[:0]
501523
for _, p := range pending {
502-
p.remaining--
503-
if p.remaining > 0 {
524+
if now.Sub(p.retiredAt) < grace {
504525
keep = append(keep, p)
505526
continue
506527
}
@@ -639,9 +660,12 @@ func rebuildSorted(tbl *routeTable) []*routeSlot {
639660
return out
640661
}
641662

642-
// findVirtualBucket returns the existing virtual bucket whose range
643-
// covers (or is closest to the right of) start. Returns nil when no
644-
// virtual bucket exists yet — caller creates one in that case.
663+
// findVirtualBucket returns the existing virtual bucket that covers
664+
// start, preferring an exact range match. If no bucket contains
665+
// start, returns the first aggregate in sortedSlots order so
666+
// over-budget routes collapse into a single global bucket rather than
667+
// fragmenting across many. Returns nil only when no virtual bucket
668+
// exists yet — caller creates one in that case.
645669
func findVirtualBucket(sorted []*routeSlot, start []byte) *routeSlot {
646670
for _, s := range sorted {
647671
if !s.Aggregate {

keyviz/sampler_test.go

Lines changed: 61 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -285,45 +285,47 @@ func TestRemoveRouteHarvestsPendingCounts(t *testing.T) {
285285
// TestRemoveVirtualMemberPrunesMemberRoutes pins Codex P2: when a
286286
// coarsened (virtual-bucket) RouteID is removed, the bucket's
287287
// MemberRoutes list must eventually drop that ID. Pruning is deferred
288-
// across retainedFlushes cycles so the row attribution stays correct
289-
// while the bucket counters still include the removed route's
288+
// across the wall-clock grace window so the row attribution stays
289+
// correct while the bucket counters still include the removed route's
290290
// pre-removal increments — TestRemoveVirtualMemberPruneDeferred pins
291291
// the grace-window half of this contract.
292292
func TestRemoveVirtualMemberPrunesMemberRoutes(t *testing.T) {
293293
t.Parallel()
294-
s := setupVirtualBucketWithThreeMembers(t)
294+
s, clk := setupVirtualBucketWithThreeMembers(t)
295295
s.RemoveRoute(2)
296-
for i := 0; i < retainedFlushes; i++ {
297-
s.Observe(3, OpRead, 1, 0)
298-
s.Flush()
299-
}
300-
// One more flush after the grace window to capture the post-prune
301-
// MemberRoutes state in a row.
296+
// Drain at least once within grace, then advance past grace so the
297+
// next Flush actually executes the prune. One more Flush captures
298+
// the post-prune MemberRoutes state in a row.
299+
s.Observe(3, OpRead, 1, 0)
300+
s.Flush()
301+
clk.Advance(s.graceWindow() + time.Second)
302+
s.Observe(3, OpRead, 1, 0)
303+
s.Flush()
302304
s.Observe(3, OpRead, 1, 0)
303305
s.Flush()
304306

305307
cols := s.Snapshot(time.Time{}, time.Time{})
306308
agg := cols[len(cols)-1].Rows[0]
307-
for _, m := range agg.MemberRoutes {
308-
if m == 2 {
309-
t.Fatalf("after grace, removed route 2 still in MemberRoutes: %v", agg.MemberRoutes)
310-
}
309+
if memberRoutesContain(agg.MemberRoutes, 2) {
310+
t.Fatalf("after grace, removed route 2 still in MemberRoutes: %v", agg.MemberRoutes)
311311
}
312312
if len(agg.MemberRoutes) != 1 || agg.MemberRoutes[0] != 3 {
313313
t.Fatalf("MemberRoutes = %v, want [3]", agg.MemberRoutes)
314314
}
315315
}
316316

317317
// TestRemoveVirtualMemberPruneDeferred pins the deferred-prune
318-
// contract: the removed routeID stays in MemberRoutes for the
319-
// retainedFlushes grace window so flushed rows whose counters still
320-
// include that route's pre-removal increments attribute the traffic
321-
// correctly.
318+
// contract: the removed routeID stays in MemberRoutes throughout the
319+
// wall-clock grace window so flushed rows whose counters still include
320+
// that route's pre-removal increments attribute the traffic correctly.
322321
func TestRemoveVirtualMemberPruneDeferred(t *testing.T) {
323322
t.Parallel()
324-
s := setupVirtualBucketWithThreeMembers(t)
323+
s, _ := setupVirtualBucketWithThreeMembers(t)
325324
s.RemoveRoute(2)
326-
for i := 0; i < retainedFlushes; i++ {
325+
// Two flushes inside the grace window — the clock is not advanced,
326+
// so each Flush sees now-retiredAt == 0 < grace and keeps the
327+
// prune entry.
328+
for i := 0; i < 2; i++ {
327329
s.Observe(3, OpRead, 1, 0)
328330
s.Flush()
329331
col := lastSnapshotColumn(t, s)
@@ -352,9 +354,9 @@ func memberRoutesContain(members []uint64, target uint64) bool {
352354
return false
353355
}
354356

355-
func setupVirtualBucketWithThreeMembers(t *testing.T) *MemSampler {
357+
func setupVirtualBucketWithThreeMembers(t *testing.T) (*MemSampler, *fakeClock) {
356358
t.Helper()
357-
s, _ := newTestSampler(t, MemSamplerOptions{
359+
s, clk := newTestSampler(t, MemSamplerOptions{
358360
Step: time.Second,
359361
HistoryColumns: 4,
360362
MaxTrackedRoutes: 1,
@@ -368,7 +370,7 @@ func setupVirtualBucketWithThreeMembers(t *testing.T) *MemSampler {
368370
if s.RegisterRoute(3, []byte("e"), []byte("g")) {
369371
t.Fatal("route 3 should fold (over budget)")
370372
}
371-
return s
373+
return s, clk
372374
}
373375

374376
// TestRegisterDoesNotRaceFlushOnVirtualBucket pins Codex P1: folding
@@ -538,31 +540,33 @@ func TestFlushSortsMixedLiveAndRetiredRows(t *testing.T) {
538540
}
539541

540542
// TestRetiredSlotGracePeriod asserts that a retired slot stays in the
541-
// drain queue for `retainedFlushes` cycles so an Observe goroutine
543+
// drain queue for the wall-clock grace window so an Observe goroutine
542544
// that loaded the pre-RemoveRoute table snapshot can complete its
543545
// atomic.Add into the slot and still have the increment harvested by
544546
// a subsequent Flush. We simulate the late writer by reaching into
545-
// the retired-slot queue directly and bumping the counter between the
546-
// two flushes.
547+
// the retired-slot queue directly and bumping the counter between
548+
// flushes inside the grace window.
547549
func TestRetiredSlotGracePeriod(t *testing.T) {
548550
t.Parallel()
549-
s, _ := newTestSampler(t, MemSamplerOptions{Step: time.Second, HistoryColumns: 8})
551+
s, clk := newTestSampler(t, MemSamplerOptions{Step: time.Second, HistoryColumns: 8})
550552
if !s.RegisterRoute(1, []byte("a"), []byte("b")) {
551553
t.Fatal("RegisterRoute(1) returned false")
552554
}
553555
s.Observe(1, OpRead, 1, 2)
554556
s.RemoveRoute(1)
555557

556558
lateSlot := graceQueueSingleSlot(t, s)
557-
s.Flush()
559+
s.Flush() // drain inside grace
558560
lateSlot.reads.Add(7)
559-
s.Flush()
561+
s.Flush() // drain inside grace, harvests the late-writer increment
560562

561563
total := totalReadsForRoute(s.Snapshot(time.Time{}, time.Time{}), 1)
562564
if total != 1+7 {
563565
t.Fatalf("expected reads 8 (pre-remove + late-writer), got %d", total)
564566
}
565567

568+
clk.Advance(s.graceWindow() + time.Second)
569+
s.Flush() // last drain, retiredAt+grace now passed → entry dropped
566570
s.retiredMu.Lock()
567571
leftover := len(s.retiredSlots)
568572
s.retiredMu.Unlock()
@@ -682,3 +686,32 @@ func TestSnapshotReturnsDeepCopy(t *testing.T) {
682686
t.Fatalf("snapshot bounds aliased live state: start=%q end=%q", r.Start, r.End)
683687
}
684688
}
689+
690+
// BenchmarkObserveHit pins the hot-path properties claimed in the
691+
// package doc: a single atomic.Pointer.Load, a map lookup, and at
692+
// most two atomic.AddUint64 calls — no allocation, no mutex. Use
693+
// `go test -bench=BenchmarkObserveHit -benchmem ./keyviz/...` to
694+
// catch regressions before they reach the coordinator wiring PR.
695+
func BenchmarkObserveHit(b *testing.B) {
696+
s := NewMemSampler(MemSamplerOptions{Step: time.Second, HistoryColumns: 4})
697+
if !s.RegisterRoute(1, []byte("a"), []byte("b")) {
698+
b.Fatal("RegisterRoute(1) returned false")
699+
}
700+
b.ReportAllocs()
701+
b.ResetTimer()
702+
for i := 0; i < b.N; i++ {
703+
s.Observe(1, OpRead, 16, 64)
704+
}
705+
}
706+
707+
// BenchmarkObserveMiss exercises the unknown-route path so a future
708+
// regression that grows allocations on misses (e.g. virtualForRoute
709+
// fallback path) is caught.
710+
func BenchmarkObserveMiss(b *testing.B) {
711+
s := NewMemSampler(MemSamplerOptions{Step: time.Second, HistoryColumns: 4})
712+
b.ReportAllocs()
713+
b.ResetTimer()
714+
for i := 0; i < b.N; i++ {
715+
s.Observe(99, OpRead, 16, 64)
716+
}
717+
}

0 commit comments

Comments
 (0)