From 252487ac42c325fdfd448fc796b01603b815270a Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sun, 26 Apr 2026 00:41:32 +0900 Subject: [PATCH 01/13] feat(keyviz): in-memory Sampler + ring buffer + RunFlusher MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit First slice of the Phase 2 Key Visualizer per docs/admin_ui_key_visualizer_design.md §5. This commit only adds the keyviz/ package — wiring into kv.ShardedCoordinator, adapter.AdminServer.GetKeyVizMatrix, and main.go follow in subsequent commits to keep diffs reviewable. Highlights: - Sampler interface (nil-safe contract: nil receiver makes Observe / RegisterRoute / Flush / Snapshot all no-ops, so callers wiring an unconfigured sampler compile to zero overhead). - MemSampler hot path: atomic.Pointer[routeTable].Load + map lookup + atomic.AddUint64 ×2 (count + bytes). Matches design §10's per-call budget — no allocation, no mutex. - Flush drains via atomic.SwapUint64; no pointer retirement, so no late-writer race past the snapshot. Idle slots skipped from the emitted MatrixColumn. - COW route table publish (RegisterRoute / RemoveRoute) under routesMu off the hot path; one atomic.Pointer.Store on publish. - Route-budget cap (--keyvizMaxTrackedRoutes default 10000): excess routes fold into a virtual aggregate slot (MatrixRow.Aggregate=true + MemberRoutes list), per design §5.3. - ringBuffer with chronological [from, to) Range queries; oldest entries drop on overflow. - RunFlusher convenience for the step-interval ticker so call sites don't repeat the boilerplate. Tests under -race cover: nil-safe contract, basic Observe→Flush roundtrip, no-counts-lost across flush boundary under writer churn, route budget coarsening, snapshot range filtering, ring eviction, RemoveRoute, unknown-route Observe, concurrent register+observe. --- keyviz/flusher.go | 37 ++++ keyviz/flusher_test.go | 57 +++++ keyviz/ring_buffer.go | 80 +++++++ keyviz/sampler.go | 460 +++++++++++++++++++++++++++++++++++++++++ keyviz/sampler_test.go | 329 +++++++++++++++++++++++++++++ 5 files changed, 963 insertions(+) create mode 100644 keyviz/flusher.go create mode 100644 keyviz/flusher_test.go create mode 100644 keyviz/ring_buffer.go create mode 100644 keyviz/sampler.go create mode 100644 keyviz/sampler_test.go diff --git a/keyviz/flusher.go b/keyviz/flusher.go new file mode 100644 index 000000000..0be239b35 --- /dev/null +++ b/keyviz/flusher.go @@ -0,0 +1,37 @@ +package keyviz + +import ( + "context" + "time" +) + +// RunFlusher drives Sampler.Flush at the supplied interval until ctx +// is cancelled. Returns when ctx fires; the final tick is not +// executed (a graceful shutdown should call Sampler.Flush once more +// after RunFlusher returns if it wants to harvest the in-progress +// step). +// +// step <= 0 falls back to DefaultStep. +// +// This is a tiny wrapper so call sites in main.go don't need to spell +// out the ticker boilerplate; testing the boilerplate is the unit test +// for this package, not for callers. +func RunFlusher(ctx context.Context, s *MemSampler, step time.Duration) { + if s == nil { + <-ctx.Done() + return + } + if step <= 0 { + step = DefaultStep + } + t := time.NewTicker(step) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + s.Flush() + } + } +} diff --git a/keyviz/flusher_test.go b/keyviz/flusher_test.go new file mode 100644 index 000000000..fcff6ab45 --- /dev/null +++ b/keyviz/flusher_test.go @@ -0,0 +1,57 @@ +package keyviz + +import ( + "context" + "testing" + "time" +) + +func TestRunFlusherTicksUntilCancel(t *testing.T) { + t.Parallel() + s := NewMemSampler(MemSamplerOptions{Step: 5 * time.Millisecond, HistoryColumns: 16}) + if !s.RegisterRoute(1, []byte("a"), []byte("b")) { + t.Fatal("Register failed") + } + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + defer close(done) + RunFlusher(ctx, s, 5*time.Millisecond) + }() + + // Drive Observe across ticker firings. + for i := 0; i < 10; i++ { + s.Observe(1, OpRead, 0, 0) + time.Sleep(2 * time.Millisecond) + } + cancel() + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("RunFlusher did not return after cancel") + } + + cols := s.Snapshot(time.Time{}, time.Time{}) + if len(cols) == 0 { + t.Fatal("expected at least one column from background flushes") + } +} + +// TestRunFlusherNilSamplerWaitsCtx asserts the nil-sampler contract +// (RunFlusher just blocks on ctx.Done so callers can hard-wire it +// regardless of whether keyviz is enabled). +func TestRunFlusherNilSamplerWaitsCtx(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + defer close(done) + RunFlusher(ctx, nil, time.Millisecond) + }() + cancel() + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("RunFlusher(nil) did not return on cancel") + } +} diff --git a/keyviz/ring_buffer.go b/keyviz/ring_buffer.go new file mode 100644 index 000000000..e1eedaf5f --- /dev/null +++ b/keyviz/ring_buffer.go @@ -0,0 +1,80 @@ +package keyviz + +import ( + "sort" + "time" +) + +// ringBuffer is a fixed-capacity circular buffer of MatrixColumn, +// oldest-first. Push drops the oldest entry once cap is reached. +// Range returns a half-open [from, to) slice in chronological order. +// +// The buffer is not goroutine-safe by itself; callers (MemSampler) +// guard it with historyMu. +type ringBuffer struct { + cap int + buf []MatrixColumn // logical-index ordered; head is the oldest + wrapped bool + pos int // next write index when wrapped == false +} + +func newRingBuffer(cap int) *ringBuffer { + if cap < 1 { + cap = 1 + } + return &ringBuffer{cap: cap, buf: make([]MatrixColumn, 0, cap)} +} + +// Push appends a column. Once length reaches cap the oldest entry is +// dropped (overwritten in place). +func (r *ringBuffer) Push(col MatrixColumn) { + if !r.wrapped { + r.buf = append(r.buf, col) + if len(r.buf) >= r.cap { + r.wrapped = true + r.pos = 0 + } + return + } + r.buf[r.pos] = col + r.pos = (r.pos + 1) % r.cap +} + +// Range returns the columns whose At falls in [from, to), oldest +// first. Either bound may be the zero Time, meaning unbounded on that +// side. The returned slice is freshly allocated; callers may mutate +// it freely. +func (r *ringBuffer) Range(from, to time.Time) []MatrixColumn { + all := r.snapshotOrdered() + // snapshotOrdered is already chronologically ordered. + lo := 0 + if !from.IsZero() { + lo = sort.Search(len(all), func(i int) bool { + return !all[i].At.Before(from) + }) + } + hi := len(all) + if !to.IsZero() { + hi = sort.Search(len(all), func(i int) bool { + return !all[i].At.Before(to) + }) + } + if lo > hi { + return nil + } + out := make([]MatrixColumn, hi-lo) + copy(out, all[lo:hi]) + return out +} + +// snapshotOrdered returns a chronologically ordered (oldest first) +// view of the buffer contents in a fresh slice. +func (r *ringBuffer) snapshotOrdered() []MatrixColumn { + if !r.wrapped { + return append([]MatrixColumn(nil), r.buf...) + } + out := make([]MatrixColumn, 0, r.cap) + out = append(out, r.buf[r.pos:]...) + out = append(out, r.buf[:r.pos]...) + return out +} diff --git a/keyviz/sampler.go b/keyviz/sampler.go new file mode 100644 index 000000000..4bc388852 --- /dev/null +++ b/keyviz/sampler.go @@ -0,0 +1,460 @@ +// Package keyviz implements the in-memory sampler that backs the +// docs/admin_ui_key_visualizer_design.md heatmap. +// +// The sampler sits on the data-plane hot path and counts requests per +// Raft route. The contract for callers (today, kv.ShardedCoordinator): +// +// - Construct a MemSampler with NewMemSampler. +// - Wire it through the coordinator with a constructor option; the +// coordinator calls Sampler.Observe at the dispatch entry, after +// resolving the RouteID. +// - Run Flush every StepSeconds in a background goroutine — RunFlusher +// does this for you with the supplied clock and ctx. +// - Read the rendered matrix via Snapshot for the Admin gRPC service. +// +// Hot-path properties (see design §5.1, §10): +// +// - Observe is a single atomic.Pointer[routeTable].Load, a plain map +// lookup against an immutable snapshot, and four atomic.AddUint64 +// calls. No allocation, no mutex. +// - Flush drains the per-route counters with atomic.SwapUint64; no +// pointer retirement, so a late writer cannot race past the snapshot +// and lose counts. +// - Adding / removing routes (RegisterRoute, RemoveRoute, ApplySplit, +// ApplyMerge) builds a fresh routeTable copy under a non-hot-path +// mutex and publishes it with a single atomic.Pointer.Store. Routes +// mutated mid-step keep their counters in the new table by design. +package keyviz + +import ( + "sort" + "sync" + "sync/atomic" + "time" +) + +// Op identifies which counter family Observe should bump. +type Op uint8 + +const ( + OpRead Op = iota + OpWrite +) + +// Series selects which counter the matrix response should expose. +type Series uint8 + +const ( + SeriesReads Series = iota + SeriesWrites + SeriesReadBytes + SeriesWriteBytes +) + +// Sampler is the narrow interface the coordinator depends on. The +// nil-safe contract is documented per-method so a coordinator wired +// without a sampler compiles to a no-op call. +type Sampler interface { + // Observe records a single request against a route. Op identifies + // the counter family. keyLen and valueLen are summed into the + // matching *Bytes counter; pass 0 for read-only ops where the + // payload size is irrelevant. + Observe(routeID uint64, op Op, keyLen, valueLen int) +} + +// Defaults for MemSamplerOptions when fields are left zero. +const ( + DefaultStep = 60 * time.Second + DefaultHistoryColumns = 1440 // 24 hours at 60s steps. + DefaultMaxTrackedRoutes = 10_000 +) + +// MemSamplerOptions configures NewMemSampler. Zero values fall back to +// the Default* constants above; passing a struct literal with only the +// fields you care about is the expected call style. +type MemSamplerOptions struct { + // Step is the flush interval. The ring buffer's resolution. + Step time.Duration + // HistoryColumns caps the ring buffer length. Older columns are + // dropped on push when the buffer is full. + HistoryColumns int + // MaxTrackedRoutes caps the number of routes whose counters are + // kept individually before coarsening kicks in. RegisterRoute + // returns false past this cap, the route ID maps into a virtual + // bucket, and Snapshot reports it with Aggregate=true. + MaxTrackedRoutes int + // Now overrides time.Now for tests; nil falls back to time.Now. + Now func() time.Time +} + +// MemSampler is the in-process Sampler implementation. The zero value +// is not usable — construct via NewMemSampler. +type MemSampler struct { + opts MemSamplerOptions + now func() time.Time + + // table holds the immutable map of currently-tracked routes. The + // hot path Observe() does a single Load + map lookup; mutations + // (Register / Remove / Split / Merge) take routesMu, copy, mutate + // the copy, then atomic-store the new pointer. + table atomic.Pointer[routeTable] + + // routesMu serialises the routeTable copy-on-write update path. + // Held only by non-hot-path callers. + routesMu sync.Mutex + + // historyMu guards the ring buffer. Reads (Snapshot) and writes + // (Flush) acquire it; Observe never touches it. + historyMu sync.Mutex + history *ringBuffer +} + +// routeTable is the COW snapshot Observe operates on. Once published +// via MemSampler.table.Store, fields are read-only. +type routeTable struct { + // slots is the live route → slot map. Lookups under the hot path + // run against this snapshot. + slots map[uint64]*routeSlot + // virtualForRoute maps a real RouteID that didn't fit under + // MaxTrackedRoutes to its virtual-bucket slot. Observe consults + // this fallback when slots[routeID] is missing. + virtualForRoute map[uint64]*routeSlot + // sortedSlots is the union of slots and virtualForRoute values + // sorted by Start, used by Snapshot for stable row ordering. + sortedSlots []*routeSlot +} + +// routeSlot owns the atomic counters for a tracked route or virtual +// bucket. Counter fields are touched by Observe (atomic.AddUint64) and +// Flush (atomic.SwapUint64); the metadata (RouteID, Start, End) is +// immutable after the slot is published. +type routeSlot struct { + RouteID uint64 + Start []byte + End []byte + // Aggregate marks virtual buckets that fold multiple coarsened + // routes together (Snapshot surfaces this in MatrixRow). + Aggregate bool + MemberRoutes []uint64 + + reads atomic.Uint64 + writes atomic.Uint64 + readBytes atomic.Uint64 + writeBytes atomic.Uint64 +} + +// MatrixColumn is one slice of the heatmap at a single flush time. +type MatrixColumn struct { + At time.Time + Rows []MatrixRow +} + +// MatrixRow is a single route or virtual bucket's counter snapshot +// taken at flush time. +type MatrixRow struct { + RouteID uint64 + Start, End []byte + Aggregate bool + MemberRoutes []uint64 + + Reads uint64 + Writes uint64 + ReadBytes uint64 + WriteBytes uint64 +} + +// NewMemSampler constructs a sampler with the supplied options. Zero +// fields fall back to the Default* constants. Returns nil only if +// opts.HistoryColumns is explicitly negative; callers should pass a +// zero options struct for the default configuration. +func NewMemSampler(opts MemSamplerOptions) *MemSampler { + if opts.Step <= 0 { + opts.Step = DefaultStep + } + if opts.HistoryColumns == 0 { + opts.HistoryColumns = DefaultHistoryColumns + } + if opts.MaxTrackedRoutes == 0 { + opts.MaxTrackedRoutes = DefaultMaxTrackedRoutes + } + now := opts.Now + if now == nil { + now = time.Now + } + s := &MemSampler{ + opts: opts, + now: now, + history: newRingBuffer(opts.HistoryColumns), + } + s.table.Store(newEmptyRouteTable()) + return s +} + +func newEmptyRouteTable() *routeTable { + return &routeTable{ + slots: map[uint64]*routeSlot{}, + virtualForRoute: map[uint64]*routeSlot{}, + } +} + +// Observe records one request against a route. Cost on a hit: +// atomic.Pointer.Load + plain map lookup + 2× atomic.AddUint64 (count +// and bytes). Misses (RouteID never registered) drop silently — the +// route-watch subscriber is responsible for Register before the +// coordinator publishes the new RouteID. +func (s *MemSampler) Observe(routeID uint64, op Op, keyLen, valueLen int) { + if s == nil { + return + } + tbl := s.table.Load() + slot, ok := tbl.slots[routeID] + if !ok { + slot, ok = tbl.virtualForRoute[routeID] + if !ok { + return + } + } + bytes := uint64(0) + if keyLen > 0 { + bytes += uint64(keyLen) + } + if valueLen > 0 { + bytes += uint64(valueLen) + } + switch op { + case OpRead: + slot.reads.Add(1) + if bytes > 0 { + slot.readBytes.Add(bytes) + } + case OpWrite: + slot.writes.Add(1) + if bytes > 0 { + slot.writeBytes.Add(bytes) + } + } +} + +// RegisterRoute adds a (RouteID, [Start, End)) pair to the tracking +// set. Returns true when the route gets its own slot, false when the +// MaxTrackedRoutes cap was hit and the route was folded into a +// virtual aggregate bucket. Idempotent: calling twice with the same +// RouteID is a no-op (the original slot stays in place). +func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte) bool { + if s == nil { + return false + } + s.routesMu.Lock() + defer s.routesMu.Unlock() + + cur := s.table.Load() + if _, ok := cur.slots[routeID]; ok { + return true + } + if _, ok := cur.virtualForRoute[routeID]; ok { + return false + } + + next := copyRouteTable(cur) + if len(next.slots) < s.opts.MaxTrackedRoutes { + slot := &routeSlot{ + RouteID: routeID, + Start: cloneBytes(start), + End: cloneBytes(end), + } + next.slots[routeID] = slot + next.sortedSlots = appendSorted(next.sortedSlots, slot) + s.table.Store(next) + return true + } + + // Coarsening: route is folded into the closest-by-start virtual + // bucket (or one is created if no virtual bucket exists yet). + bucket := findVirtualBucket(next.sortedSlots, start) + if bucket == nil { + bucket = &routeSlot{ + RouteID: routeID, + Start: cloneBytes(start), + End: cloneBytes(end), + Aggregate: true, + MemberRoutes: []uint64{routeID}, + } + next.sortedSlots = appendSorted(next.sortedSlots, bucket) + } else { + bucket.MemberRoutes = append(bucket.MemberRoutes, routeID) + // Extend bucket end if the new route reaches further right. + if len(end) == 0 || (len(bucket.End) != 0 && bytesGT(end, bucket.End)) { + bucket.End = cloneBytes(end) + } + if bytesLT(start, bucket.Start) { + bucket.Start = cloneBytes(start) + } + } + next.virtualForRoute[routeID] = bucket + s.table.Store(next) + return false +} + +// RemoveRoute drops a RouteID from tracking. Counts accumulated since +// the last flush stay in the slot until the next Flush picks them up +// from the retired routeTable; subsequent Observe(routeID, …) calls +// are silent no-ops. Idempotent. +func (s *MemSampler) RemoveRoute(routeID uint64) { + if s == nil { + return + } + s.routesMu.Lock() + defer s.routesMu.Unlock() + cur := s.table.Load() + if _, ok := cur.slots[routeID]; !ok { + if _, ok := cur.virtualForRoute[routeID]; !ok { + return + } + } + next := copyRouteTable(cur) + delete(next.slots, routeID) + delete(next.virtualForRoute, routeID) + next.sortedSlots = rebuildSorted(next) + s.table.Store(next) +} + +// Flush drains every slot's counters with atomic.SwapUint64 and +// appends one MatrixColumn to the ring buffer. Idle slots (all +// counters zero) are skipped to keep the column compact. +func (s *MemSampler) Flush() { + if s == nil { + return + } + tbl := s.table.Load() + col := MatrixColumn{At: s.now()} + for _, slot := range tbl.sortedSlots { + reads := slot.reads.Swap(0) + writes := slot.writes.Swap(0) + readBytes := slot.readBytes.Swap(0) + writeBytes := slot.writeBytes.Swap(0) + if reads == 0 && writes == 0 && readBytes == 0 && writeBytes == 0 { + continue + } + col.Rows = append(col.Rows, MatrixRow{ + RouteID: slot.RouteID, + Start: slot.Start, + End: slot.End, + Aggregate: slot.Aggregate, + MemberRoutes: append([]uint64(nil), slot.MemberRoutes...), + Reads: reads, + Writes: writes, + ReadBytes: readBytes, + WriteBytes: writeBytes, + }) + } + s.historyMu.Lock() + s.history.Push(col) + s.historyMu.Unlock() +} + +// Snapshot returns the matrix columns in the supplied [from, to) +// half-open time range. Ordering is oldest-first. Series selects which +// MatrixRow value the caller is going to display; the slot metadata +// (RouteID, Start, End, Aggregate, MemberRoutes) is included on every +// row so a UI can render the same response with different series +// without a re-query. +func (s *MemSampler) Snapshot(from, to time.Time) []MatrixColumn { + if s == nil { + return nil + } + s.historyMu.Lock() + defer s.historyMu.Unlock() + return s.history.Range(from, to) +} + +// helpers below ----------------------------------------------------- + +func copyRouteTable(src *routeTable) *routeTable { + dst := &routeTable{ + slots: make(map[uint64]*routeSlot, len(src.slots)+1), + virtualForRoute: make(map[uint64]*routeSlot, len(src.virtualForRoute)+1), + sortedSlots: append([]*routeSlot(nil), src.sortedSlots...), + } + for k, v := range src.slots { + dst.slots[k] = v + } + for k, v := range src.virtualForRoute { + dst.virtualForRoute[k] = v + } + return dst +} + +func appendSorted(slots []*routeSlot, slot *routeSlot) []*routeSlot { + idx := sort.Search(len(slots), func(i int) bool { + return bytesGE(slots[i].Start, slot.Start) + }) + slots = append(slots, nil) + copy(slots[idx+1:], slots[idx:]) + slots[idx] = slot + return slots +} + +func rebuildSorted(tbl *routeTable) []*routeSlot { + out := make([]*routeSlot, 0, len(tbl.slots)+len(tbl.virtualForRoute)) + seen := make(map[*routeSlot]struct{}, cap(out)) + for _, s := range tbl.slots { + if _, dup := seen[s]; dup { + continue + } + seen[s] = struct{}{} + out = append(out, s) + } + for _, s := range tbl.virtualForRoute { + if _, dup := seen[s]; dup { + continue + } + seen[s] = struct{}{} + out = append(out, s) + } + sort.Slice(out, func(i, j int) bool { + return bytesLT(out[i].Start, out[j].Start) + }) + return out +} + +// findVirtualBucket returns the existing virtual bucket whose range +// covers (or is closest to the right of) start. Returns nil when no +// virtual bucket exists yet — caller creates one in that case. +func findVirtualBucket(sorted []*routeSlot, start []byte) *routeSlot { + for _, s := range sorted { + if !s.Aggregate { + continue + } + if bytesLE(s.Start, start) && (len(s.End) == 0 || bytesLT(start, s.End)) { + return s + } + } + for _, s := range sorted { + if s.Aggregate { + return s + } + } + return nil +} + +func cloneBytes(b []byte) []byte { + if len(b) == 0 { + return nil + } + out := make([]byte, len(b)) + copy(out, b) + return out +} + +func bytesLT(a, b []byte) bool { + for i := 0; i < len(a) && i < len(b); i++ { + if a[i] != b[i] { + return a[i] < b[i] + } + } + return len(a) < len(b) +} + +func bytesLE(a, b []byte) bool { return !bytesGT(a, b) } +func bytesGE(a, b []byte) bool { return !bytesLT(a, b) } +func bytesGT(a, b []byte) bool { return bytesLT(b, a) } diff --git a/keyviz/sampler_test.go b/keyviz/sampler_test.go new file mode 100644 index 000000000..1dc7b0857 --- /dev/null +++ b/keyviz/sampler_test.go @@ -0,0 +1,329 @@ +package keyviz + +import ( + "sync" + "sync/atomic" + "testing" + "time" +) + +func newTestSampler(t *testing.T, opts MemSamplerOptions) (*MemSampler, *fakeClock) { + t.Helper() + clk := &fakeClock{now: time.Unix(1_700_000_000, 0)} + if opts.Now == nil { + opts.Now = clk.Now + } + return NewMemSampler(opts), clk +} + +type fakeClock struct { + mu sync.Mutex + now time.Time +} + +func (c *fakeClock) Now() time.Time { + c.mu.Lock() + defer c.mu.Unlock() + return c.now +} + +func (c *fakeClock) Advance(d time.Duration) { + c.mu.Lock() + defer c.mu.Unlock() + c.now = c.now.Add(d) +} + +func TestNilSamplerObserveSafe(t *testing.T) { + t.Parallel() + var s *MemSampler // nil + s.Observe(1, OpRead, 10, 20) + if s.RegisterRoute(1, []byte("a"), []byte("b")) { + t.Fatal("nil RegisterRoute should return false") + } + s.RemoveRoute(1) + s.Flush() + if got := s.Snapshot(time.Time{}, time.Time{}); got != nil { + t.Fatalf("nil Snapshot should be nil, got %v", got) + } +} + +func TestObserveAndFlushBasic(t *testing.T) { + t.Parallel() + s, clk := newTestSampler(t, MemSamplerOptions{Step: time.Second, HistoryColumns: 4}) + if !s.RegisterRoute(1, []byte("a"), []byte("c")) { + t.Fatal("RegisterRoute(1) returned false") + } + s.Observe(1, OpRead, 5, 10) + s.Observe(1, OpRead, 5, 10) + s.Observe(1, OpWrite, 5, 100) + + s.Flush() + clk.Advance(time.Second) + s.Flush() // empty + + cols := s.Snapshot(time.Time{}, time.Time{}) + if len(cols) != 2 { + t.Fatalf("got %d columns, want 2", len(cols)) + } + c0 := cols[0] + if len(c0.Rows) != 1 { + t.Fatalf("col0 rows = %d, want 1", len(c0.Rows)) + } + r := c0.Rows[0] + if r.RouteID != 1 || r.Reads != 2 || r.Writes != 1 || r.ReadBytes != 30 || r.WriteBytes != 105 { + t.Fatalf("col0 row = %+v", r) + } + if len(cols[1].Rows) != 0 { + t.Fatalf("col1 should be empty (no observe between flushes), got %v", cols[1].Rows) + } +} + +// TestNoCountsLostAcrossFlush asserts the SwapUint64 flush protocol +// doesn't drop counts even when many goroutines hammer Observe across +// the flush boundary. Repeats the exercise N times to flush out +// scheduling races. +func TestNoCountsLostAcrossFlush(t *testing.T) { + t.Parallel() + const ( + writers = 8 + perWriter = 5_000 + flushes = 20 + ) + s, _ := newTestSampler(t, MemSamplerOptions{Step: time.Millisecond, HistoryColumns: 1024}) + if !s.RegisterRoute(1, []byte("a"), []byte("b")) { + t.Fatal("Register failed") + } + + stop := make(chan struct{}) + var wg sync.WaitGroup + for w := 0; w < writers; w++ { + wg.Add(1) + go func() { + defer wg.Done() + for n := 0; n < perWriter; n++ { + select { + case <-stop: + return + default: + } + s.Observe(1, OpRead, 1, 0) + } + }() + } + // Fire flushes from a separate goroutine while writers run. + flushDone := make(chan struct{}) + go func() { + defer close(flushDone) + for i := 0; i < flushes; i++ { + s.Flush() + time.Sleep(time.Microsecond * 100) + } + }() + + wg.Wait() + close(stop) + <-flushDone + // One last flush so any straggler atomic.Add is harvested. + s.Flush() + + var total uint64 + for _, col := range s.Snapshot(time.Time{}, time.Time{}) { + for _, row := range col.Rows { + total += row.Reads + } + } + want := uint64(writers * perWriter) + if total != want { + t.Fatalf("total reads = %d, want %d (lost %d)", total, want, want-total) + } +} + +func TestRouteBudgetCoarsensIntoVirtualBucket(t *testing.T) { + t.Parallel() + s := budgetSetup(t) + s.Observe(1, OpRead, 1, 0) + s.Observe(2, OpRead, 1, 0) + s.Observe(3, OpRead, 1, 0) + s.Flush() + rows := budgetSingleColumnRows(t, s) + agg := findAggregateRow(t, rows) + if len(agg.MemberRoutes) != 1 || agg.MemberRoutes[0] != 3 { + t.Fatalf("aggregate.MemberRoutes = %v, want [3]", agg.MemberRoutes) + } +} + +func budgetSetup(t *testing.T) *MemSampler { + t.Helper() + s, _ := newTestSampler(t, MemSamplerOptions{ + Step: time.Second, + HistoryColumns: 4, + MaxTrackedRoutes: 2, + }) + if !s.RegisterRoute(1, []byte("a"), []byte("c")) { + t.Fatal("route 1 should fit") + } + if !s.RegisterRoute(2, []byte("c"), []byte("e")) { + t.Fatal("route 2 should fit") + } + if s.RegisterRoute(3, []byte("e"), []byte("g")) { + t.Fatal("route 3 over budget should return false (folded into virtual bucket)") + } + return s +} + +func budgetSingleColumnRows(t *testing.T, s *MemSampler) []MatrixRow { + t.Helper() + cols := s.Snapshot(time.Time{}, time.Time{}) + if len(cols) != 1 { + t.Fatalf("len(cols) = %d", len(cols)) + } + rows := cols[0].Rows + if len(rows) != 3 { + t.Fatalf("rows = %d, want 3", len(rows)) + } + return rows +} + +func findAggregateRow(t *testing.T, rows []MatrixRow) MatrixRow { + t.Helper() + for i := range rows { + if rows[i].Aggregate { + return rows[i] + } + } + t.Fatalf("no aggregate row in output: %+v", rows) + return MatrixRow{} +} + +func TestSnapshotRangeFilters(t *testing.T) { + t.Parallel() + s, clk := newTestSampler(t, MemSamplerOptions{Step: time.Second, HistoryColumns: 8}) + s.RegisterRoute(1, []byte("a"), []byte("b")) + for i := 0; i < 5; i++ { + s.Observe(1, OpRead, 0, 0) + s.Flush() + clk.Advance(time.Second) + } + all := s.Snapshot(time.Time{}, time.Time{}) + if len(all) != 5 { + t.Fatalf("all len = %d", len(all)) + } + mid := all[1].At + end := all[3].At + got := s.Snapshot(mid, end) + if len(got) != 2 { + t.Fatalf("[mid,end) len = %d, want 2 (entries at +1s and +2s); cols=%v", len(got), columnTimes(all)) + } + if !got[0].At.Equal(mid) { + t.Fatalf("got[0].At = %v, want %v", got[0].At, mid) + } +} + +func TestRingBufferDropsOldest(t *testing.T) { + t.Parallel() + s, clk := newTestSampler(t, MemSamplerOptions{Step: time.Second, HistoryColumns: 3}) + s.RegisterRoute(1, []byte("a"), []byte("b")) + for i := 0; i < 5; i++ { + s.Observe(1, OpRead, 0, 0) + s.Flush() + clk.Advance(time.Second) + } + all := s.Snapshot(time.Time{}, time.Time{}) + if len(all) != 3 { + t.Fatalf("len = %d, want 3 (cap)", len(all)) + } + // Should be the most recent three columns, oldest first. + for i := 1; i < len(all); i++ { + if !all[i].At.After(all[i-1].At) { + t.Fatalf("not chronological at %d: %v then %v", i, all[i-1].At, all[i].At) + } + } +} + +func TestRemoveRouteSilencesObserve(t *testing.T) { + t.Parallel() + s, _ := newTestSampler(t, MemSamplerOptions{Step: time.Second, HistoryColumns: 4}) + s.RegisterRoute(1, []byte("a"), []byte("b")) + s.Observe(1, OpRead, 0, 0) + s.RemoveRoute(1) + s.Observe(1, OpRead, 0, 0) // silently dropped + s.Flush() + cols := s.Snapshot(time.Time{}, time.Time{}) + // The pre-RemoveRoute Observe was on the now-retired slot, so the + // counter is already lost. We only assert that the post-Remove + // Observe didn't show up. + for _, c := range cols { + for _, r := range c.Rows { + if r.RouteID == 1 && r.Reads > 1 { + t.Fatalf("post-remove Observe leaked: %+v", r) + } + } + } +} + +func TestObserveOnUnknownRouteIsNoop(t *testing.T) { + t.Parallel() + s, _ := newTestSampler(t, MemSamplerOptions{Step: time.Second, HistoryColumns: 4}) + s.Observe(42, OpRead, 0, 0) + s.Flush() + cols := s.Snapshot(time.Time{}, time.Time{}) + for _, c := range cols { + if len(c.Rows) != 0 { + t.Fatalf("unknown-route Observe leaked: %+v", c) + } + } +} + +// TestConcurrentRegisterAndObserveRace is the -race regression: a +// concurrent RegisterRoute (COW table publish) must not race with +// Observe (Load + map lookup against the snapshot). +func TestConcurrentRegisterAndObserveRace(t *testing.T) { + t.Parallel() + s, _ := newTestSampler(t, MemSamplerOptions{Step: time.Second, HistoryColumns: 8}) + const routes = 64 + var observed atomic.Uint64 + + stop := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for i := uint64(1); i <= routes; i++ { + s.RegisterRoute(i, []byte{byte('a' + i%26)}, []byte{byte('a' + (i+1)%26)}) + time.Sleep(time.Microsecond) + } + }() + wg.Add(4) + for w := 0; w < 4; w++ { + go func() { + defer wg.Done() + for { + select { + case <-stop: + return + default: + } + for i := uint64(1); i <= routes; i++ { + s.Observe(i, OpRead, 0, 0) + observed.Add(1) + } + } + }() + } + + time.Sleep(20 * time.Millisecond) + close(stop) + wg.Wait() + + if observed.Load() == 0 { + t.Fatal("observers ran zero iterations — schedule starvation?") + } +} + +func columnTimes(cols []MatrixColumn) []time.Time { + out := make([]time.Time, len(cols)) + for i, c := range cols { + out[i] = c.At + } + return out +} From e492ad5f4cfd615aa018ff98912542c383518b82 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sun, 26 Apr 2026 00:58:42 +0900 Subject: [PATCH 02/13] fix(keyviz): protect virtual bucket meta + harvest pending counts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three Codex findings on commit 252487ac: P1 — RegisterRoute mutated a shared virtual bucket (MemberRoutes, Start, End) before publishing the COW table, but copyRouteTable only shallow-copies slot pointers, so the same routeSlot is reachable from the still-published table. A concurrent Flush reading MemberRoutes would observe a half-extended slice. Add metaMu (sync.RWMutex) to routeSlot; RegisterRoute holds it Lock() while mutating; Flush emits rows via slot.snapshotMeta() under RLock(). Observe never reads metadata so the hot path stays lockless. P2 — RemoveRoute deleted the slot from the active table immediately, so any counts accumulated since the last Flush became unreachable (the slot is GC'd before the next Flush sees it). Queue removed individual slots into MemSampler.retiredSlots; Flush drains the retired queue alongside the live table and clears it. Counts are preserved across route churn. P2 — RemoveRoute on a virtual-bucket member only deleted the virtualForRoute mapping; the shared bucket's MemberRoutes list still listed the removed RouteID, leaking stale drill-down metadata. Now prune the bucket's MemberRoutes list in place under metaMu while holding routesMu (no concurrent registers). Tests added (all under -race): - TestRemoveRouteHarvestsPendingCounts: 7 reads + Remove + Flush emits a row with Reads=7. - TestRemoveVirtualMemberPrunesMemberRoutes: removed virtual member is not advertised in subsequent Snapshot rows. - TestRegisterDoesNotRaceFlushOnVirtualBucket: -race detector catches the P1 regression if metaMu is dropped. --- keyviz/sampler.go | 143 ++++++++++++++++++++++++++++++++--------- keyviz/sampler_test.go | 133 +++++++++++++++++++++++++++++++++++++- 2 files changed, 243 insertions(+), 33 deletions(-) diff --git a/keyviz/sampler.go b/keyviz/sampler.go index 4bc388852..08ec90535 100644 --- a/keyviz/sampler.go +++ b/keyviz/sampler.go @@ -107,6 +107,13 @@ type MemSampler struct { // (Flush) acquire it; Observe never touches it. historyMu sync.Mutex history *ringBuffer + + // retiredSlots holds slots that RemoveRoute removed from the live + // table since the last Flush. Flush drains and clears the list so + // counts accumulated between (last flush, RemoveRoute) are not + // silently dropped during route churn. + retiredMu sync.Mutex + retiredSlots []*routeSlot } // routeTable is the COW snapshot Observe operates on. Once published @@ -126,9 +133,18 @@ type routeTable struct { // routeSlot owns the atomic counters for a tracked route or virtual // bucket. Counter fields are touched by Observe (atomic.AddUint64) and -// Flush (atomic.SwapUint64); the metadata (RouteID, Start, End) is -// immutable after the slot is published. +// Flush (atomic.SwapUint64); the metadata (RouteID, Start, End, +// Aggregate, MemberRoutes) for an individual route slot is immutable +// after the slot is published. +// +// Virtual buckets are the exception: when a new RouteID over the +// MaxTrackedRoutes cap folds into an existing bucket, RegisterRoute +// extends MemberRoutes and may grow Start/End. metaMu serialises +// those updates against Flush's read-side iteration so a concurrent +// Flush cannot observe a half-extended slice. Observe never reads +// these fields, so the hot path remains lockless. type routeSlot struct { + metaMu sync.RWMutex RouteID uint64 Start []byte End []byte @@ -143,6 +159,22 @@ type routeSlot struct { writeBytes atomic.Uint64 } +// snapshotMeta returns a defensive copy of the slot's metadata under +// the read lock. Used by Flush so the row it emits doesn't share +// MemberRoutes with the live slot (which a later RegisterRoute may +// extend). +func (s *routeSlot) snapshotMeta() (start, end []byte, aggregate bool, members []uint64) { + s.metaMu.RLock() + defer s.metaMu.RUnlock() + start = s.Start + end = s.End + aggregate = s.Aggregate + if len(s.MemberRoutes) > 0 { + members = append([]uint64(nil), s.MemberRoutes...) + } + return +} + // MatrixColumn is one slice of the heatmap at a single flush time. type MatrixColumn struct { At time.Time @@ -281,14 +313,21 @@ func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte) bool { } next.sortedSlots = appendSorted(next.sortedSlots, bucket) } else { + // Mutate the existing bucket under its metaMu so a concurrent + // Flush iterating the previous table's snapshot doesn't observe + // a half-extended MemberRoutes slice or a partially-updated + // Start/End. Counters stay in place — they live next to the + // metadata fields but are protected by the atomic ops, not the + // mutex. + bucket.metaMu.Lock() bucket.MemberRoutes = append(bucket.MemberRoutes, routeID) - // Extend bucket end if the new route reaches further right. if len(end) == 0 || (len(bucket.End) != 0 && bytesGT(end, bucket.End)) { bucket.End = cloneBytes(end) } if bytesLT(start, bucket.Start) { bucket.Start = cloneBytes(start) } + bucket.metaMu.Unlock() } next.virtualForRoute[routeID] = bucket s.table.Store(next) @@ -296,9 +335,10 @@ func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte) bool { } // RemoveRoute drops a RouteID from tracking. Counts accumulated since -// the last flush stay in the slot until the next Flush picks them up -// from the retired routeTable; subsequent Observe(routeID, …) calls -// are silent no-ops. Idempotent. +// the last flush are NOT lost: the retired slot (or, for virtual-bucket +// members, just the membership entry) is queued for one final drain by +// the next Flush. Subsequent Observe(routeID, …) calls are silent +// no-ops. Idempotent. func (s *MemSampler) RemoveRoute(routeID uint64) { if s == nil { return @@ -306,52 +346,95 @@ func (s *MemSampler) RemoveRoute(routeID uint64) { s.routesMu.Lock() defer s.routesMu.Unlock() cur := s.table.Load() - if _, ok := cur.slots[routeID]; !ok { - if _, ok := cur.virtualForRoute[routeID]; !ok { - return - } + individual, isIndividual := cur.slots[routeID] + bucket, isVirtual := cur.virtualForRoute[routeID] + if !isIndividual && !isVirtual { + return } + next := copyRouteTable(cur) delete(next.slots, routeID) delete(next.virtualForRoute, routeID) + + switch { + case isIndividual: + // Pending counters in this slot must be harvested by the next + // Flush; queue the slot rather than letting GC eat the counts. + s.retiredMu.Lock() + s.retiredSlots = append(s.retiredSlots, individual) + s.retiredMu.Unlock() + case isVirtual: + // Prune the removed RouteID from the bucket's MemberRoutes so + // later Snapshot rows do not advertise a stale member. + bucket.metaMu.Lock() + filtered := bucket.MemberRoutes[:0] + for _, m := range bucket.MemberRoutes { + if m != routeID { + filtered = append(filtered, m) + } + } + bucket.MemberRoutes = filtered + bucket.metaMu.Unlock() + } + next.sortedSlots = rebuildSorted(next) s.table.Store(next) } // Flush drains every slot's counters with atomic.SwapUint64 and // appends one MatrixColumn to the ring buffer. Idle slots (all -// counters zero) are skipped to keep the column compact. +// counters zero) are skipped to keep the column compact. Slots that +// RemoveRoute retired since the previous Flush are drained alongside +// the live table, so route churn does not silently lose counts. func (s *MemSampler) Flush() { if s == nil { return } - tbl := s.table.Load() col := MatrixColumn{At: s.now()} + tbl := s.table.Load() for _, slot := range tbl.sortedSlots { - reads := slot.reads.Swap(0) - writes := slot.writes.Swap(0) - readBytes := slot.readBytes.Swap(0) - writeBytes := slot.writeBytes.Swap(0) - if reads == 0 && writes == 0 && readBytes == 0 && writeBytes == 0 { - continue - } - col.Rows = append(col.Rows, MatrixRow{ - RouteID: slot.RouteID, - Start: slot.Start, - End: slot.End, - Aggregate: slot.Aggregate, - MemberRoutes: append([]uint64(nil), slot.MemberRoutes...), - Reads: reads, - Writes: writes, - ReadBytes: readBytes, - WriteBytes: writeBytes, - }) + col.Rows = appendDrainedRow(col.Rows, slot) + } + + s.retiredMu.Lock() + retired := s.retiredSlots + s.retiredSlots = nil + s.retiredMu.Unlock() + for _, slot := range retired { + col.Rows = appendDrainedRow(col.Rows, slot) } + s.historyMu.Lock() s.history.Push(col) s.historyMu.Unlock() } +// appendDrainedRow swaps the slot's counters to zero and appends a +// MatrixRow when any counter was non-zero. Idle slots are skipped. +// Metadata is read under the slot's metaMu so a concurrent +// RegisterRoute fold cannot race with the row materialisation. +func appendDrainedRow(rows []MatrixRow, slot *routeSlot) []MatrixRow { + reads := slot.reads.Swap(0) + writes := slot.writes.Swap(0) + readBytes := slot.readBytes.Swap(0) + writeBytes := slot.writeBytes.Swap(0) + if reads == 0 && writes == 0 && readBytes == 0 && writeBytes == 0 { + return rows + } + start, end, aggregate, members := slot.snapshotMeta() + return append(rows, MatrixRow{ + RouteID: slot.RouteID, + Start: start, + End: end, + Aggregate: aggregate, + MemberRoutes: members, + Reads: reads, + Writes: writes, + ReadBytes: readBytes, + WriteBytes: writeBytes, + }) +} + // Snapshot returns the matrix columns in the supplied [from, to) // half-open time range. Ordering is oldest-first. Series selects which // MatrixRow value the caller is going to display; the slot metadata diff --git a/keyviz/sampler_test.go b/keyviz/sampler_test.go index 1dc7b0857..a70fe2714 100644 --- a/keyviz/sampler_test.go +++ b/keyviz/sampler_test.go @@ -249,9 +249,6 @@ func TestRemoveRouteSilencesObserve(t *testing.T) { s.Observe(1, OpRead, 0, 0) // silently dropped s.Flush() cols := s.Snapshot(time.Time{}, time.Time{}) - // The pre-RemoveRoute Observe was on the now-retired slot, so the - // counter is already lost. We only assert that the post-Remove - // Observe didn't show up. for _, c := range cols { for _, r := range c.Rows { if r.RouteID == 1 && r.Reads > 1 { @@ -261,6 +258,136 @@ func TestRemoveRouteSilencesObserve(t *testing.T) { } } +// TestRemoveRouteHarvestsPendingCounts pins Codex P2: counts +// accumulated between the last flush and RemoveRoute must not be +// silently dropped. The retired slot is queued for one final drain. +func TestRemoveRouteHarvestsPendingCounts(t *testing.T) { + t.Parallel() + s, _ := newTestSampler(t, MemSamplerOptions{Step: time.Second, HistoryColumns: 4}) + s.RegisterRoute(1, []byte("a"), []byte("b")) + for i := 0; i < 7; i++ { + s.Observe(1, OpRead, 1, 0) + } + // Remove BEFORE flushing — pending 7 reads must surface. + s.RemoveRoute(1) + s.Flush() + + cols := s.Snapshot(time.Time{}, time.Time{}) + if len(cols) != 1 { + t.Fatalf("len(cols) = %d, want 1", len(cols)) + } + rows := cols[0].Rows + if len(rows) != 1 || rows[0].RouteID != 1 || rows[0].Reads != 7 { + t.Fatalf("rows = %+v, want one row with route=1 reads=7", rows) + } +} + +// TestRemoveVirtualMemberPrunesMemberRoutes pins Codex P2: when a +// coarsened (virtual-bucket) RouteID is removed, the bucket's +// MemberRoutes list must drop that ID so later snapshots don't +// advertise stale members. +func TestRemoveVirtualMemberPrunesMemberRoutes(t *testing.T) { + t.Parallel() + s := setupVirtualBucketWithThreeMembers(t) + s.RemoveRoute(2) + s.Observe(3, OpRead, 1, 0) + s.Flush() + + agg := findAggregateAcrossSnapshot(t, s.Snapshot(time.Time{}, time.Time{})) + for _, m := range agg.MemberRoutes { + if m == 2 { + t.Fatalf("removed route 2 still in MemberRoutes: %v", agg.MemberRoutes) + } + } + if len(agg.MemberRoutes) != 1 || agg.MemberRoutes[0] != 3 { + t.Fatalf("MemberRoutes = %v, want [3]", agg.MemberRoutes) + } +} + +func setupVirtualBucketWithThreeMembers(t *testing.T) *MemSampler { + t.Helper() + s, _ := newTestSampler(t, MemSamplerOptions{ + Step: time.Second, + HistoryColumns: 4, + MaxTrackedRoutes: 1, + }) + if !s.RegisterRoute(1, []byte("a"), []byte("c")) { + t.Fatal("route 1 should fit") + } + if s.RegisterRoute(2, []byte("c"), []byte("e")) { + t.Fatal("route 2 should fold (over budget)") + } + if s.RegisterRoute(3, []byte("e"), []byte("g")) { + t.Fatal("route 3 should fold (over budget)") + } + return s +} + +func findAggregateAcrossSnapshot(t *testing.T, cols []MatrixColumn) MatrixRow { + t.Helper() + for _, c := range cols { + for _, r := range c.Rows { + if r.Aggregate { + return r + } + } + } + t.Fatal("no aggregate row in snapshot") + return MatrixRow{} +} + +// TestRegisterDoesNotRaceFlushOnVirtualBucket pins Codex P1: folding +// a new route into an existing virtual bucket must not race a +// concurrent Flush iterating the slot's metadata. -race detector +// makes the regression observable. +func TestRegisterDoesNotRaceFlushOnVirtualBucket(t *testing.T) { + t.Parallel() + s, _ := newTestSampler(t, MemSamplerOptions{ + Step: time.Millisecond, + HistoryColumns: 1024, + MaxTrackedRoutes: 1, + }) + if !s.RegisterRoute(1, []byte("a"), []byte("b")) { + t.Fatal("route 1 should fit") + } + // Seed a virtual bucket so subsequent Registers fold into it. + if s.RegisterRoute(2, []byte("c"), []byte("d")) { + t.Fatal("route 2 should fold") + } + + stop := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for n := 3; ; n++ { + select { + case <-stop: + return + default: + } + s.RegisterRoute(uint64(n), []byte{byte(n)}, []byte{byte(n + 1)}) //nolint:gosec // bounded test loop. + } + }() + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-stop: + return + default: + } + s.Flush() + } + }() + + time.Sleep(50 * time.Millisecond) + close(stop) + wg.Wait() + // No assertion needed — race detector failures are the test signal. +} + func TestObserveOnUnknownRouteIsNoop(t *testing.T) { t.Parallel() s, _ := newTestSampler(t, MemSamplerOptions{Step: time.Second, HistoryColumns: 4}) From 347afd14e8f47109262daf94f62a45b97446973c Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sun, 26 Apr 2026 01:08:28 +0900 Subject: [PATCH 03/13] keyviz: deep-copy snapshot rows + slot bounds Codex round-2 P2: Snapshot consumers were aliasing live sampler state. snapshotMeta returned Start/End by reference (so flushed rows tracked later RegisterRoute mutations) and ringBuffer.Range only copied MatrixColumn structs (so concurrent Flush could rotate the underlying Rows slice, and caller mutation could corrupt stored history). Clone Start/End in snapshotMeta and add a cloneColumn helper that ringBuffer.Range uses to materialise a fully-owned column per call. Regression test TestSnapshotReturnsDeepCopy mutates a returned rows bounds and verifies a follow-up snapshot is unaffected. --- keyviz/ring_buffer.go | 25 ++++++++++++++++++++++--- keyviz/sampler.go | 9 +++++---- keyviz/sampler_test.go | 31 +++++++++++++++++++++++++++++++ 3 files changed, 58 insertions(+), 7 deletions(-) diff --git a/keyviz/ring_buffer.go b/keyviz/ring_buffer.go index e1eedaf5f..a3dcc6162 100644 --- a/keyviz/ring_buffer.go +++ b/keyviz/ring_buffer.go @@ -42,8 +42,10 @@ func (r *ringBuffer) Push(col MatrixColumn) { // Range returns the columns whose At falls in [from, to), oldest // first. Either bound may be the zero Time, meaning unbounded on that -// side. The returned slice is freshly allocated; callers may mutate -// it freely. +// side. The returned slice is a deep copy: each column has its own +// Rows slice and each row owns its Start/End byte slices, so callers +// may mutate any field without corrupting stored history or racing +// with concurrent flushes. func (r *ringBuffer) Range(from, to time.Time) []MatrixColumn { all := r.snapshotOrdered() // snapshotOrdered is already chronologically ordered. @@ -63,10 +65,27 @@ func (r *ringBuffer) Range(from, to time.Time) []MatrixColumn { return nil } out := make([]MatrixColumn, hi-lo) - copy(out, all[lo:hi]) + for i, src := range all[lo:hi] { + out[i] = cloneColumn(src) + } return out } +// cloneColumn returns a deep copy of col: a fresh Rows slice with +// each row's Start/End and MemberRoutes independently allocated. +func cloneColumn(col MatrixColumn) MatrixColumn { + rows := make([]MatrixRow, len(col.Rows)) + for i, row := range col.Rows { + rows[i] = row + rows[i].Start = cloneBytes(row.Start) + rows[i].End = cloneBytes(row.End) + if len(row.MemberRoutes) > 0 { + rows[i].MemberRoutes = append([]uint64(nil), row.MemberRoutes...) + } + } + return MatrixColumn{At: col.At, Rows: rows} +} + // snapshotOrdered returns a chronologically ordered (oldest first) // view of the buffer contents in a fresh slice. func (r *ringBuffer) snapshotOrdered() []MatrixColumn { diff --git a/keyviz/sampler.go b/keyviz/sampler.go index 08ec90535..9b4835fc3 100644 --- a/keyviz/sampler.go +++ b/keyviz/sampler.go @@ -161,13 +161,14 @@ type routeSlot struct { // snapshotMeta returns a defensive copy of the slot's metadata under // the read lock. Used by Flush so the row it emits doesn't share -// MemberRoutes with the live slot (which a later RegisterRoute may -// extend). +// Start/End/MemberRoutes with the live slot (which a later +// RegisterRoute may extend, and which the snapshot API exports to +// external consumers that may mutate the bounds). func (s *routeSlot) snapshotMeta() (start, end []byte, aggregate bool, members []uint64) { s.metaMu.RLock() defer s.metaMu.RUnlock() - start = s.Start - end = s.End + start = cloneBytes(s.Start) + end = cloneBytes(s.End) aggregate = s.Aggregate if len(s.MemberRoutes) > 0 { members = append([]uint64(nil), s.MemberRoutes...) diff --git a/keyviz/sampler_test.go b/keyviz/sampler_test.go index a70fe2714..a181f59d5 100644 --- a/keyviz/sampler_test.go +++ b/keyviz/sampler_test.go @@ -454,3 +454,34 @@ func columnTimes(cols []MatrixColumn) []time.Time { } return out } + +// TestSnapshotReturnsDeepCopy guards the public-API contract that the +// Snapshot result is fully owned by the caller: mutating row bounds or +// member-route slices must not corrupt later snapshots, and must not +// race with concurrent Flush/RegisterRoute. +func TestSnapshotReturnsDeepCopy(t *testing.T) { + t.Parallel() + s, _ := newTestSampler(t, MemSamplerOptions{Step: time.Second, HistoryColumns: 4}) + if !s.RegisterRoute(1, []byte("aaaa"), []byte("bbbb")) { + t.Fatal("RegisterRoute(1) returned false") + } + s.Observe(1, OpRead, 1, 2) + s.Flush() + + first := s.Snapshot(time.Time{}, time.Time{}) + if len(first) == 0 || len(first[0].Rows) == 0 { + t.Fatalf("expected at least one row in first snapshot, got %+v", first) + } + first[0].Rows[0].Start[0] = 'X' + first[0].Rows[0].End[0] = 'Y' + first[0].Rows = nil + + second := s.Snapshot(time.Time{}, time.Time{}) + if len(second) == 0 || len(second[0].Rows) == 0 { + t.Fatalf("second snapshot lost rows after caller mutation: %+v", second) + } + r := second[0].Rows[0] + if string(r.Start) != "aaaa" || string(r.End) != "bbbb" { + t.Fatalf("snapshot bounds aliased live state: start=%q end=%q", r.Start, r.End) + } +} From 7c6ce9560bcd5c09c0cd076b4d78799c2b58440e Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sun, 26 Apr 2026 01:18:04 +0900 Subject: [PATCH 04/13] keyviz: retired-slot grace period and fold reorder MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex round-3 P1: RemoveRoute previously queued the retired slot for a single Flush and then dropped it. An Observe goroutine that loaded the pre-RemoveRoute table snapshot could still complete its atomic.Add into the slot after that drain, silently losing the increment. Hold each retired slot for retainedFlushes (=2) cycles instead — the late writer is harvested by the second drain. Codex round-3 P2: Folding an over-budget route into an existing virtual bucket may lower bucket.Start, but the bucket kept its old position in next.sortedSlots, so Flush emitted matrix rows out of key order. Rebuild sortedSlots whenever Start is lowered, restoring the API ordering contract. Refactor: extracted foldIntoBucket helper so RegisterRoute stays under the cyclop budget. Regression tests TestRetiredSlotGracePeriod and TestRegisterFoldLowerStartReorders cover both fixes. --- keyviz/sampler.go | 87 +++++++++++++++++++++---------- keyviz/sampler_test.go | 115 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 175 insertions(+), 27 deletions(-) diff --git a/keyviz/sampler.go b/keyviz/sampler.go index 9b4835fc3..cd1440aa3 100644 --- a/keyviz/sampler.go +++ b/keyviz/sampler.go @@ -109,13 +109,29 @@ type MemSampler struct { history *ringBuffer // retiredSlots holds slots that RemoveRoute removed from the live - // table since the last Flush. Flush drains and clears the list so - // counts accumulated between (last flush, RemoveRoute) are not - // silently dropped during route churn. + // table. Each entry is drained for `remaining` Flushes — a grace + // period that lets late Observe writers (which loaded the route + // table before RemoveRoute's atomic.Store) finish their Add into + // the slot before we forget about it. retainedFlushes controls the + // length of that window. retiredMu sync.Mutex - retiredSlots []*routeSlot + retiredSlots []retiredSlot } +// retiredSlot tracks a removed slot through its post-removal grace +// period. remaining counts down to zero across successive Flushes. +type retiredSlot struct { + slot *routeSlot + remaining int +} + +// retainedFlushes is how many Flush cycles a retired slot stays in +// the drain queue after RemoveRoute. Two cycles is enough in practice: +// any goroutine still holding a pre-RemoveRoute table snapshot will +// have completed its single atomic.Add long before the second flush +// tick (Step is at human timescales, microseconds suffice). +const retainedFlushes = 2 + // routeTable is the COW snapshot Observe operates on. Once published // via MemSampler.table.Store, fields are read-only. type routeTable struct { @@ -314,27 +330,36 @@ func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte) bool { } next.sortedSlots = appendSorted(next.sortedSlots, bucket) } else { - // Mutate the existing bucket under its metaMu so a concurrent - // Flush iterating the previous table's snapshot doesn't observe - // a half-extended MemberRoutes slice or a partially-updated - // Start/End. Counters stay in place — they live next to the - // metadata fields but are protected by the atomic ops, not the - // mutex. - bucket.metaMu.Lock() - bucket.MemberRoutes = append(bucket.MemberRoutes, routeID) - if len(end) == 0 || (len(bucket.End) != 0 && bytesGT(end, bucket.End)) { - bucket.End = cloneBytes(end) - } - if bytesLT(start, bucket.Start) { - bucket.Start = cloneBytes(start) - } - bucket.metaMu.Unlock() + foldIntoBucket(next, bucket, routeID, start, end) } next.virtualForRoute[routeID] = bucket s.table.Store(next) return false } +// foldIntoBucket extends an existing virtual bucket to cover routeID's +// [start, end). Mutates bucket under its metaMu so a concurrent Flush +// iterating the previous table's snapshot doesn't observe a +// half-extended MemberRoutes slice or partially-updated Start/End. +// Counters live next to the metadata but are protected by their own +// atomic ops, not metaMu. If Start is lowered, sortedSlots is rebuilt +// to preserve Flush's key-order contract. +func foldIntoBucket(next *routeTable, bucket *routeSlot, routeID uint64, start, end []byte) { + bucket.metaMu.Lock() + bucket.MemberRoutes = append(bucket.MemberRoutes, routeID) + if len(end) == 0 || (len(bucket.End) != 0 && bytesGT(end, bucket.End)) { + bucket.End = cloneBytes(end) + } + startLowered := bytesLT(start, bucket.Start) + if startLowered { + bucket.Start = cloneBytes(start) + } + bucket.metaMu.Unlock() + if startLowered { + next.sortedSlots = rebuildSorted(next) + } +} + // RemoveRoute drops a RouteID from tracking. Counts accumulated since // the last flush are NOT lost: the retired slot (or, for virtual-bucket // members, just the membership entry) is queued for one final drain by @@ -359,10 +384,14 @@ func (s *MemSampler) RemoveRoute(routeID uint64) { switch { case isIndividual: - // Pending counters in this slot must be harvested by the next - // Flush; queue the slot rather than letting GC eat the counts. + // Pending counters in this slot must be harvested by upcoming + // Flush cycles. Drain across `retainedFlushes` cycles so an + // Observe call that loaded the prior table just before this + // atomic.Store can still complete its Add into the slot — that + // in-flight write is caught by the next drain rather than + // silently lost. s.retiredMu.Lock() - s.retiredSlots = append(s.retiredSlots, individual) + s.retiredSlots = append(s.retiredSlots, retiredSlot{slot: individual, remaining: retainedFlushes}) s.retiredMu.Unlock() case isVirtual: // Prune the removed RouteID from the bucket's MemberRoutes so @@ -398,12 +427,16 @@ func (s *MemSampler) Flush() { } s.retiredMu.Lock() - retired := s.retiredSlots - s.retiredSlots = nil - s.retiredMu.Unlock() - for _, slot := range retired { - col.Rows = appendDrainedRow(col.Rows, slot) + keep := s.retiredSlots[:0] + for _, r := range s.retiredSlots { + col.Rows = appendDrainedRow(col.Rows, r.slot) + r.remaining-- + if r.remaining > 0 { + keep = append(keep, r) + } } + s.retiredSlots = keep + s.retiredMu.Unlock() s.historyMu.Lock() s.history.Push(col) diff --git a/keyviz/sampler_test.go b/keyviz/sampler_test.go index a181f59d5..684293416 100644 --- a/keyviz/sampler_test.go +++ b/keyviz/sampler_test.go @@ -455,6 +455,121 @@ func columnTimes(cols []MatrixColumn) []time.Time { return out } +// TestRetiredSlotGracePeriod asserts that a retired slot stays in the +// drain queue for `retainedFlushes` cycles so an Observe goroutine +// that loaded the pre-RemoveRoute table snapshot can complete its +// atomic.Add into the slot and still have the increment harvested by +// a subsequent Flush. We simulate the late writer by reaching into +// the retired-slot queue directly and bumping the counter between the +// two flushes. +func TestRetiredSlotGracePeriod(t *testing.T) { + t.Parallel() + s, _ := newTestSampler(t, MemSamplerOptions{Step: time.Second, HistoryColumns: 8}) + if !s.RegisterRoute(1, []byte("a"), []byte("b")) { + t.Fatal("RegisterRoute(1) returned false") + } + s.Observe(1, OpRead, 1, 2) + s.RemoveRoute(1) + + lateSlot := graceQueueSingleSlot(t, s) + s.Flush() + lateSlot.reads.Add(7) + s.Flush() + + total := totalReadsForRoute(s.Snapshot(time.Time{}, time.Time{}), 1) + if total != 1+7 { + t.Fatalf("expected reads 8 (pre-remove + late-writer), got %d", total) + } + + s.retiredMu.Lock() + leftover := len(s.retiredSlots) + s.retiredMu.Unlock() + if leftover != 0 { + t.Fatalf("retired slot not released after grace, len=%d", leftover) + } +} + +func graceQueueSingleSlot(t *testing.T, s *MemSampler) *routeSlot { + t.Helper() + s.retiredMu.Lock() + defer s.retiredMu.Unlock() + if len(s.retiredSlots) != 1 { + t.Fatalf("expected 1 retired slot, got %d", len(s.retiredSlots)) + } + return s.retiredSlots[0].slot +} + +func totalReadsForRoute(cols []MatrixColumn, routeID uint64) uint64 { + var total uint64 + for _, c := range cols { + for _, r := range c.Rows { + if r.RouteID == routeID { + total += r.Reads + } + } + } + return total +} + +// TestRegisterFoldLowerStartReorders guards the sortedSlots ordering +// invariant: when an over-budget route folds into an existing virtual +// bucket and lowers the bucket's Start, the bucket must be repositioned +// in sortedSlots so Flush emits matrix rows in key order. +func TestRegisterFoldLowerStartReorders(t *testing.T) { + t.Parallel() + s, _ := newTestSampler(t, MemSamplerOptions{ + Step: time.Second, + HistoryColumns: 4, + MaxTrackedRoutes: 2, + }) + mustRegister(t, s, 1, "m", "n") + mustRegister(t, s, 2, "p", "q") + // Route 3 is over budget — creates a virtual bucket at Start=r. + if s.RegisterRoute(3, []byte("r"), []byte("s")) { + t.Fatal("route 3 over budget should fold into virtual bucket") + } + // Route 4 is over budget AND has a Start ("a") below the bucket's + // existing Start ("r"). The fold must lower bucket.Start to "a" + // AND reposition the bucket within sortedSlots. + if s.RegisterRoute(4, []byte("a"), []byte("b")) { + t.Fatal("route 4 over budget should fold into virtual bucket") + } + s.Observe(1, OpRead, 0, 0) + s.Observe(2, OpRead, 0, 0) + s.Observe(3, OpRead, 0, 0) + s.Observe(4, OpRead, 0, 0) + s.Flush() + + rows := flushedRowsSorted(t, s) + agg := findAggregateRow(t, rows) + if string(agg.Start) != "a" { + t.Fatalf("aggregate.Start = %q, want %q (fold did not lower Start)", agg.Start, "a") + } +} + +func mustRegister(t *testing.T, s *MemSampler, routeID uint64, start, end string) { + t.Helper() + if !s.RegisterRoute(routeID, []byte(start), []byte(end)) { + t.Fatalf("RegisterRoute(%d) returned false", routeID) + } +} + +func flushedRowsSorted(t *testing.T, s *MemSampler) []MatrixRow { + t.Helper() + cols := s.Snapshot(time.Time{}, time.Time{}) + if len(cols) == 0 || len(cols[0].Rows) == 0 { + t.Fatal("no rows after flush") + } + rows := cols[0].Rows + for i := 1; i < len(rows); i++ { + if !bytesLE(rows[i-1].Start, rows[i].Start) { + t.Fatalf("rows not sorted: rows[%d].Start=%q > rows[%d].Start=%q", + i-1, rows[i-1].Start, i, rows[i].Start) + } + } + return rows +} + // TestSnapshotReturnsDeepCopy guards the public-API contract that the // Snapshot result is fully owned by the caller: mutating row bounds or // member-route slices must not corrupt later snapshots, and must not From 156e9f50b70f8f70cdb998c3273eb752cda7cb5e Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sun, 26 Apr 2026 01:32:20 +0900 Subject: [PATCH 05/13] keyviz: sort flushed rows + defer virtual-member prune MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Round-4 review fixes for PR #639: Codex P2 (sampler.go:419-444) and CodeRabbit duplicate finding: Flush appended retired-slot rows after the live sortedSlots scan, so a removed route with a Start between two live slots produced a column whose Rows were not monotone by Start. Add a final sort.SliceStable on col.Rows so the API contract holds across the retainedFlushes window. TestFlushSortsMixedLiveAndRetiredRows is the regression. Codex P2 (sampler.go:404): Removing a virtual-bucket member pruned the routeID from MemberRoutes immediately, but the bucket counters still held that routes pre-removal increments — the next flushed row mixed the removed routes traffic into the bucket while advertising only the remaining members. Defer the prune across retainedFlushes via a pendingPrunes queue so the row attribution stays accurate during the grace window. TestRemoveVirtualMemberPruneDeferred + an updated TestRemoveVirtualMemberPrunesMemberRoutes pin both halves of the contract. CodeRabbit nits: package comment said Observe does four atomic.AddUint64 calls (its at most two); NewMemSampler doc claimed it returns nil for negative HistoryColumns (it never does — newRingBuffer clamps). Updated both. Added a Step() accessor so callers wiring RunFlusher can read the configured interval rather than threading it through two config paths. Refactored Flush into drainRetiredSlots + advancePendingPrunes helpers to keep cyclomatic complexity under budget. --- keyviz/sampler.go | 145 ++++++++++++++++++++++++++++++++--------- keyviz/sampler_test.go | 88 ++++++++++++++++++++----- 2 files changed, 184 insertions(+), 49 deletions(-) diff --git a/keyviz/sampler.go b/keyviz/sampler.go index cd1440aa3..536fe9d5e 100644 --- a/keyviz/sampler.go +++ b/keyviz/sampler.go @@ -15,8 +15,10 @@ // Hot-path properties (see design §5.1, §10): // // - Observe is a single atomic.Pointer[routeTable].Load, a plain map -// lookup against an immutable snapshot, and four atomic.AddUint64 -// calls. No allocation, no mutex. +// lookup against an immutable snapshot, and at most two +// atomic.AddUint64 calls (one for the count, one for bytes — +// skipped when both keyLen and valueLen are zero). No allocation, +// no mutex. // - Flush drains the per-route counters with atomic.SwapUint64; no // pointer retirement, so a late writer cannot race past the snapshot // and lose counts. @@ -113,9 +115,15 @@ type MemSampler struct { // period that lets late Observe writers (which loaded the route // table before RemoveRoute's atomic.Store) finish their Add into // the slot before we forget about it. retainedFlushes controls the - // length of that window. - retiredMu sync.Mutex - retiredSlots []retiredSlot + // length of that window. pendingPrunes is the same idea for + // virtual-bucket member-route removals: the routeID stays in + // MemberRoutes during the grace window so the row attribution + // stays correct while the bucket counters still include the + // removed route's pre-removal increments. Both are guarded by + // retiredMu since they are only touched off the hot path. + retiredMu sync.Mutex + retiredSlots []retiredSlot + pendingPrunes []memberPrune } // retiredSlot tracks a removed slot through its post-removal grace @@ -125,11 +133,23 @@ type retiredSlot struct { remaining int } -// retainedFlushes is how many Flush cycles a retired slot stays in -// the drain queue after RemoveRoute. Two cycles is enough in practice: -// any goroutine still holding a pre-RemoveRoute table snapshot will -// have completed its single atomic.Add long before the second flush -// tick (Step is at human timescales, microseconds suffice). +// memberPrune tracks a deferred virtual-bucket member-route removal. +// remaining counts down across Flushes; while it stays positive the +// routeID is still advertised in bucket.MemberRoutes so the flushed +// row's attribution matches the bucket counters that still include +// pre-removal increments from this route. +type memberPrune struct { + bucket *routeSlot + routeID uint64 + remaining int +} + +// retainedFlushes is how many Flush cycles a retired slot (or pending +// member-prune) stays in the queue after RemoveRoute. Two cycles is +// enough in practice: any goroutine still holding a pre-RemoveRoute +// table snapshot will have completed its single atomic.Add long +// before the second flush tick (Step is at human timescales, +// microseconds suffice). const retainedFlushes = 2 // routeTable is the COW snapshot Observe operates on. Once published @@ -213,9 +233,10 @@ type MatrixRow struct { } // NewMemSampler constructs a sampler with the supplied options. Zero -// fields fall back to the Default* constants. Returns nil only if -// opts.HistoryColumns is explicitly negative; callers should pass a -// zero options struct for the default configuration. +// fields fall back to the Default* constants; non-positive values +// (including explicitly-negative HistoryColumns) are clamped to a safe +// minimum by newRingBuffer. Always returns a usable sampler — callers +// should pass a zero options struct for the default configuration. func NewMemSampler(opts MemSamplerOptions) *MemSampler { if opts.Step <= 0 { opts.Step = DefaultStep @@ -394,17 +415,16 @@ func (s *MemSampler) RemoveRoute(routeID uint64) { s.retiredSlots = append(s.retiredSlots, retiredSlot{slot: individual, remaining: retainedFlushes}) s.retiredMu.Unlock() case isVirtual: - // Prune the removed RouteID from the bucket's MemberRoutes so - // later Snapshot rows do not advertise a stale member. - bucket.metaMu.Lock() - filtered := bucket.MemberRoutes[:0] - for _, m := range bucket.MemberRoutes { - if m != routeID { - filtered = append(filtered, m) - } - } - bucket.MemberRoutes = filtered - bucket.metaMu.Unlock() + // Defer pruning until after the bucket's pre-removal counters + // have been drained. While the prune is pending the routeID + // stays in MemberRoutes so the next few Flush rows attribute + // the bucket's mixed counters to all members that contributed + // to them — including the route we are removing. + s.retiredMu.Lock() + s.pendingPrunes = append(s.pendingPrunes, memberPrune{ + bucket: bucket, routeID: routeID, remaining: retainedFlushes, + }) + s.retiredMu.Unlock() } next.sortedSlots = rebuildSorted(next) @@ -416,6 +436,11 @@ func (s *MemSampler) RemoveRoute(routeID uint64) { // counters zero) are skipped to keep the column compact. Slots that // RemoveRoute retired since the previous Flush are drained alongside // the live table, so route churn does not silently lose counts. +// +// Rows are emitted in Start-key order regardless of which slot list +// they came from (live, retired, or virtual-member-pruned), preserving +// the API contract that matrix consumers can rely on monotone Start +// across columns. func (s *MemSampler) Flush() { if s == nil { return @@ -427,20 +452,76 @@ func (s *MemSampler) Flush() { } s.retiredMu.Lock() - keep := s.retiredSlots[:0] - for _, r := range s.retiredSlots { - col.Rows = appendDrainedRow(col.Rows, r.slot) + s.retiredSlots = drainRetiredSlots(s.retiredSlots, &col.Rows) + s.pendingPrunes = advancePendingPrunes(s.pendingPrunes) + s.retiredMu.Unlock() + + sort.SliceStable(col.Rows, func(i, j int) bool { + return bytesLT(col.Rows[i].Start, col.Rows[j].Start) + }) + + s.historyMu.Lock() + s.history.Push(col) + s.historyMu.Unlock() +} + +// drainRetiredSlots emits a row for each retired slot, decrements the +// per-entry grace counter, and returns the surviving entries (in the +// original backing array). Rows are appended to *rows; the slice +// header is updated through the pointer so the caller observes the +// growth. +func drainRetiredSlots(retired []retiredSlot, rows *[]MatrixRow) []retiredSlot { + keep := retired[:0] + for _, r := range retired { + *rows = appendDrainedRow(*rows, r.slot) r.remaining-- if r.remaining > 0 { keep = append(keep, r) } } - s.retiredSlots = keep - s.retiredMu.Unlock() + return keep +} - s.historyMu.Lock() - s.history.Push(col) - s.historyMu.Unlock() +// advancePendingPrunes decrements each entry's grace counter; entries +// whose counter reaches zero have their routeID actually pruned from +// the bucket's MemberRoutes. Returns the surviving entries. +func advancePendingPrunes(pending []memberPrune) []memberPrune { + keep := pending[:0] + for _, p := range pending { + p.remaining-- + if p.remaining > 0 { + keep = append(keep, p) + continue + } + pruneMemberRoute(p.bucket, p.routeID) + } + return keep +} + +// pruneMemberRoute removes routeID from bucket.MemberRoutes under the +// bucket's metaMu so a concurrent snapshotMeta reader sees a +// consistent view. +func pruneMemberRoute(bucket *routeSlot, routeID uint64) { + bucket.metaMu.Lock() + defer bucket.metaMu.Unlock() + filtered := bucket.MemberRoutes[:0] + for _, m := range bucket.MemberRoutes { + if m != routeID { + filtered = append(filtered, m) + } + } + bucket.MemberRoutes = filtered +} + +// Step returns the configured flush interval after applying default +// fallbacks. Callers wiring up RunFlusher can use this to align their +// ticker with the sampler's expectations rather than passing the +// interval through two configuration paths. +func (s *MemSampler) Step() time.Duration { + if s == nil { + return DefaultStep + } + return s.opts.Step } // appendDrainedRow swaps the slot's counters to zero and appends a diff --git a/keyviz/sampler_test.go b/keyviz/sampler_test.go index 684293416..b429abe6f 100644 --- a/keyviz/sampler_test.go +++ b/keyviz/sampler_test.go @@ -284,19 +284,29 @@ func TestRemoveRouteHarvestsPendingCounts(t *testing.T) { // TestRemoveVirtualMemberPrunesMemberRoutes pins Codex P2: when a // coarsened (virtual-bucket) RouteID is removed, the bucket's -// MemberRoutes list must drop that ID so later snapshots don't -// advertise stale members. +// MemberRoutes list must eventually drop that ID. Pruning is deferred +// across retainedFlushes cycles so the row attribution stays correct +// while the bucket counters still include the removed route's +// pre-removal increments — TestRemoveVirtualMemberPruneDeferred pins +// the grace-window half of this contract. func TestRemoveVirtualMemberPrunesMemberRoutes(t *testing.T) { t.Parallel() s := setupVirtualBucketWithThreeMembers(t) s.RemoveRoute(2) + for i := 0; i < retainedFlushes; i++ { + s.Observe(3, OpRead, 1, 0) + s.Flush() + } + // One more flush after the grace window to capture the post-prune + // MemberRoutes state in a row. s.Observe(3, OpRead, 1, 0) s.Flush() - agg := findAggregateAcrossSnapshot(t, s.Snapshot(time.Time{}, time.Time{})) + cols := s.Snapshot(time.Time{}, time.Time{}) + agg := cols[len(cols)-1].Rows[0] for _, m := range agg.MemberRoutes { if m == 2 { - t.Fatalf("removed route 2 still in MemberRoutes: %v", agg.MemberRoutes) + t.Fatalf("after grace, removed route 2 still in MemberRoutes: %v", agg.MemberRoutes) } } if len(agg.MemberRoutes) != 1 || agg.MemberRoutes[0] != 3 { @@ -304,6 +314,44 @@ func TestRemoveVirtualMemberPrunesMemberRoutes(t *testing.T) { } } +// TestRemoveVirtualMemberPruneDeferred pins the deferred-prune +// contract: the removed routeID stays in MemberRoutes for the +// retainedFlushes grace window so flushed rows whose counters still +// include that route's pre-removal increments attribute the traffic +// correctly. +func TestRemoveVirtualMemberPruneDeferred(t *testing.T) { + t.Parallel() + s := setupVirtualBucketWithThreeMembers(t) + s.RemoveRoute(2) + for i := 0; i < retainedFlushes; i++ { + s.Observe(3, OpRead, 1, 0) + s.Flush() + col := lastSnapshotColumn(t, s) + agg := findAggregateRow(t, col.Rows) + if !memberRoutesContain(agg.MemberRoutes, 2) { + t.Fatalf("flush %d within grace dropped route 2 too early: %v", i, agg.MemberRoutes) + } + } +} + +func lastSnapshotColumn(t *testing.T, s *MemSampler) MatrixColumn { + t.Helper() + cols := s.Snapshot(time.Time{}, time.Time{}) + if len(cols) == 0 { + t.Fatal("snapshot empty") + } + return cols[len(cols)-1] +} + +func memberRoutesContain(members []uint64, target uint64) bool { + for _, m := range members { + if m == target { + return true + } + } + return false +} + func setupVirtualBucketWithThreeMembers(t *testing.T) *MemSampler { t.Helper() s, _ := newTestSampler(t, MemSamplerOptions{ @@ -323,19 +371,6 @@ func setupVirtualBucketWithThreeMembers(t *testing.T) *MemSampler { return s } -func findAggregateAcrossSnapshot(t *testing.T, cols []MatrixColumn) MatrixRow { - t.Helper() - for _, c := range cols { - for _, r := range c.Rows { - if r.Aggregate { - return r - } - } - } - t.Fatal("no aggregate row in snapshot") - return MatrixRow{} -} - // TestRegisterDoesNotRaceFlushOnVirtualBucket pins Codex P1: folding // a new route into an existing virtual bucket must not race a // concurrent Flush iterating the slot's metadata. -race detector @@ -455,6 +490,25 @@ func columnTimes(cols []MatrixColumn) []time.Time { return out } +// TestFlushSortsMixedLiveAndRetiredRows pins the row-ordering +// invariant when retired-slot drains land in the same column as live +// slot drains. Without a final sort the retired rows would be appended +// in queue order, producing a column whose Rows are not monotone by +// Start. +func TestFlushSortsMixedLiveAndRetiredRows(t *testing.T) { + t.Parallel() + s, _ := newTestSampler(t, MemSamplerOptions{Step: time.Second, HistoryColumns: 4}) + mustRegister(t, s, 1, "a", "b") + mustRegister(t, s, 2, "m", "n") + mustRegister(t, s, 3, "z", "{") + s.Observe(2, OpRead, 0, 0) + s.RemoveRoute(2) + s.Observe(1, OpRead, 0, 0) + s.Observe(3, OpRead, 0, 0) + s.Flush() + flushedRowsSorted(t, s) +} + // TestRetiredSlotGracePeriod asserts that a retired slot stays in the // drain queue for `retainedFlushes` cycles so an Observe goroutine // that loaded the pre-RemoveRoute table snapshot can complete its From d39d31e5a0494ef4a523f5fce8d50bcb132f2905 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sun, 26 Apr 2026 01:44:17 +0900 Subject: [PATCH 06/13] keyviz: drain orphaned virtual bucket on last member removal Codex round-5 P1: When RemoveRoute removes the last member of a virtual bucket, delete(virtualForRoute, routeID) leaves no path back to the bucket, so rebuildSorted drops it from sortedSlots and Flush silently loses any pre-removal counters the bucket still holds (plus any in-flight late-Observe writers hitting the prior table snapshot). Detect the now-orphaned bucket via bucketStillReferenced and queue it on retiredSlots with the same retainedFlushes grace so Flush keeps draining it across the grace window. The pendingPrune entry already keeps MemberRoutes intact for those drains, so attribution remains correct. Regression test TestRemoveLastVirtualMemberHarvestsBucket sets MaxTrackedRoutes=1, observes the over-budget route, removes it, and asserts the bucket counts survive into the next snapshot column. --- keyviz/sampler.go | 24 ++++++++++++++++++++++++ keyviz/sampler_test.go | 28 ++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/keyviz/sampler.go b/keyviz/sampler.go index 536fe9d5e..8645bb59a 100644 --- a/keyviz/sampler.go +++ b/keyviz/sampler.go @@ -424,6 +424,17 @@ func (s *MemSampler) RemoveRoute(routeID uint64) { s.pendingPrunes = append(s.pendingPrunes, memberPrune{ bucket: bucket, routeID: routeID, remaining: retainedFlushes, }) + // If this delete left the bucket with no remaining + // virtualForRoute mapping, rebuildSorted will drop it from the + // live sortedSlots. Queue it as a retired slot so Flush keeps + // draining its counters across retainedFlushes — otherwise + // pre-removal increments and any in-flight Observe writers + // hitting the prior table snapshot would be silently lost. + if !bucketStillReferenced(next.virtualForRoute, bucket) { + s.retiredSlots = append(s.retiredSlots, retiredSlot{ + slot: bucket, remaining: retainedFlushes, + }) + } s.retiredMu.Unlock() } @@ -498,6 +509,19 @@ func advancePendingPrunes(pending []memberPrune) []memberPrune { return keep } +// bucketStillReferenced reports whether any RouteID in +// virtualForRoute still maps to bucket. Used by RemoveRoute to detect +// when a virtual bucket has lost its last member and must be retired +// for grace draining. +func bucketStillReferenced(virtualForRoute map[uint64]*routeSlot, bucket *routeSlot) bool { + for _, b := range virtualForRoute { + if b == bucket { + return true + } + } + return false +} + // pruneMemberRoute removes routeID from bucket.MemberRoutes under the // bucket's metaMu so a concurrent snapshotMeta reader sees a // consistent view. diff --git a/keyviz/sampler_test.go b/keyviz/sampler_test.go index b429abe6f..a8261ac8e 100644 --- a/keyviz/sampler_test.go +++ b/keyviz/sampler_test.go @@ -490,6 +490,34 @@ func columnTimes(cols []MatrixColumn) []time.Time { return out } +// TestRemoveLastVirtualMemberHarvestsBucket pins Codex round-5 P1: +// removing the last member of a virtual bucket leaves the bucket +// orphaned in the route table — rebuildSorted no longer reaches it +// from virtualForRoute, so without the orphan-retire path Flush would +// silently lose any pre-removal counters the bucket still holds. +func TestRemoveLastVirtualMemberHarvestsBucket(t *testing.T) { + t.Parallel() + s, _ := newTestSampler(t, MemSamplerOptions{ + Step: time.Second, + HistoryColumns: 4, + MaxTrackedRoutes: 1, + }) + mustRegister(t, s, 1, "a", "b") + if s.RegisterRoute(2, []byte("c"), []byte("d")) { + t.Fatal("route 2 over budget should fold into virtual bucket") + } + s.Observe(2, OpRead, 5, 7) + s.RemoveRoute(2) + s.Flush() + + rows := lastSnapshotColumn(t, s).Rows + agg := findAggregateRow(t, rows) + if agg.Reads != 1 || agg.ReadBytes != 12 { + t.Fatalf("orphan bucket counts dropped: reads=%d bytes=%d (want 1, 12)", + agg.Reads, agg.ReadBytes) + } +} + // TestFlushSortsMixedLiveAndRetiredRows pins the row-ordering // invariant when retired-slot drains land in the same column as live // slot drains. Without a final sort the retired rows would be appended From 1b15db2e53929319d60424ac0bb23408832d9ec6 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sun, 26 Apr 2026 01:59:15 +0900 Subject: [PATCH 07/13] keyviz: tie late-writer grace to wall clock + benchmarks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex round-6 P2: the late-writer grace was driven by a flush count (retainedFlushes=2). At small Step values (millisecond scale) that window can expire before a preempted Observe goroutine resumes, silently dropping its atomic.Add into a slot that has just been freed from retiredSlots. Switch to a wall-clock grace: each retiredSlot and memberPrune carries its retiredAt timestamp; Flush keeps the entry while now-retiredAt is below graceWindow = max(2*Step, 5s). The minGraceWindow floor protects sub-second Step configurations. Claude round-6 nits: - HistoryColumns <= 0 (not just == 0) now falls back to the default, so a negative value doesnt silently land at cap=1. - findVirtualBucket doc now describes the actual fallback (first aggregate in key order) instead of "closest to the right of start". - Package doc no longer asserts ApplySplit/ApplyMerge as current callers — flagged as future PR. Add BenchmarkObserveHit and BenchmarkObserveMiss so the hot-path properties (no alloc, lockless) are pinned ahead of coordinator wiring. Local M1 Max: 7 ns/op hit, 3.5 ns/op miss, 0 allocs. --- keyviz/sampler.go | 114 +++++++++++++++++++++++++---------------- keyviz/sampler_test.go | 89 ++++++++++++++++++++++---------- 2 files changed, 130 insertions(+), 73 deletions(-) diff --git a/keyviz/sampler.go b/keyviz/sampler.go index 8645bb59a..19f18ed8e 100644 --- a/keyviz/sampler.go +++ b/keyviz/sampler.go @@ -22,10 +22,11 @@ // - Flush drains the per-route counters with atomic.SwapUint64; no // pointer retirement, so a late writer cannot race past the snapshot // and lose counts. -// - Adding / removing routes (RegisterRoute, RemoveRoute, ApplySplit, -// ApplyMerge) builds a fresh routeTable copy under a non-hot-path -// mutex and publishes it with a single atomic.Pointer.Store. Routes -// mutated mid-step keep their counters in the new table by design. +// - Adding / removing routes (RegisterRoute, RemoveRoute today; +// ApplySplit / ApplyMerge in a future PR) builds a fresh +// routeTable copy under a non-hot-path mutex and publishes it +// with a single atomic.Pointer.Store. Routes mutated mid-step +// keep their counters in the new table by design. package keyviz import ( @@ -127,30 +128,49 @@ type MemSampler struct { } // retiredSlot tracks a removed slot through its post-removal grace -// period. remaining counts down to zero across successive Flushes. +// period. retiredAt is the wall-clock instant of removal; the entry +// is drained on every Flush whose `now()` is within the configured +// grace window of retiredAt, then dropped. type retiredSlot struct { slot *routeSlot - remaining int + retiredAt time.Time } // memberPrune tracks a deferred virtual-bucket member-route removal. -// remaining counts down across Flushes; while it stays positive the -// routeID is still advertised in bucket.MemberRoutes so the flushed -// row's attribution matches the bucket counters that still include -// pre-removal increments from this route. +// retiredAt is the wall-clock instant of removal; while now() is +// within the grace window, the routeID stays in bucket.MemberRoutes so +// flushed rows continue attributing the bucket's mixed counters to all +// members that contributed (including this one). type memberPrune struct { bucket *routeSlot routeID uint64 - remaining int + retiredAt time.Time } -// retainedFlushes is how many Flush cycles a retired slot (or pending -// member-prune) stays in the queue after RemoveRoute. Two cycles is -// enough in practice: any goroutine still holding a pre-RemoveRoute -// table snapshot will have completed its single atomic.Add long -// before the second flush tick (Step is at human timescales, -// microseconds suffice). -const retainedFlushes = 2 +// minGraceWindow is the wall-clock floor for late-writer grace. It +// guards against Step being configured small enough (millisecond +// scale) that a flush-count-based grace window would expire before a +// preempted Observe goroutine resumes. Any goroutine still holding a +// pre-RemoveRoute table snapshot must finish its single atomic.Add +// before now-retiredAt exceeds this window. +const minGraceWindow = 5 * time.Second + +// graceStepMultiplier is how many Step intervals we want the grace +// window to cover by default — picked so retired slots are drained +// at least twice before being dropped. +const graceStepMultiplier = 2 + +// graceWindow is the duration retired slots and pending member-prunes +// stay drainable after RemoveRoute. It's tied to wall-clock time, not +// Flush count, so a small Step doesn't shrink the grace below +// minGraceWindow. +func (s *MemSampler) graceWindow() time.Duration { + g := graceStepMultiplier * s.opts.Step + if g < minGraceWindow { + g = minGraceWindow + } + return g +} // routeTable is the COW snapshot Observe operates on. Once published // via MemSampler.table.Store, fields are read-only. @@ -241,7 +261,7 @@ func NewMemSampler(opts MemSamplerOptions) *MemSampler { if opts.Step <= 0 { opts.Step = DefaultStep } - if opts.HistoryColumns == 0 { + if opts.HistoryColumns <= 0 { opts.HistoryColumns = DefaultHistoryColumns } if opts.MaxTrackedRoutes == 0 { @@ -403,16 +423,17 @@ func (s *MemSampler) RemoveRoute(routeID uint64) { delete(next.slots, routeID) delete(next.virtualForRoute, routeID) + retiredAt := s.now() switch { case isIndividual: // Pending counters in this slot must be harvested by upcoming - // Flush cycles. Drain across `retainedFlushes` cycles so an - // Observe call that loaded the prior table just before this + // Flush cycles. Drain across the grace window so an Observe + // call that loaded the prior table just before this // atomic.Store can still complete its Add into the slot — that - // in-flight write is caught by the next drain rather than + // in-flight write is caught by a later drain rather than // silently lost. s.retiredMu.Lock() - s.retiredSlots = append(s.retiredSlots, retiredSlot{slot: individual, remaining: retainedFlushes}) + s.retiredSlots = append(s.retiredSlots, retiredSlot{slot: individual, retiredAt: retiredAt}) s.retiredMu.Unlock() case isVirtual: // Defer pruning until after the bucket's pre-removal counters @@ -422,17 +443,17 @@ func (s *MemSampler) RemoveRoute(routeID uint64) { // to them — including the route we are removing. s.retiredMu.Lock() s.pendingPrunes = append(s.pendingPrunes, memberPrune{ - bucket: bucket, routeID: routeID, remaining: retainedFlushes, + bucket: bucket, routeID: routeID, retiredAt: retiredAt, }) // If this delete left the bucket with no remaining // virtualForRoute mapping, rebuildSorted will drop it from the // live sortedSlots. Queue it as a retired slot so Flush keeps - // draining its counters across retainedFlushes — otherwise + // draining its counters across the grace window — otherwise // pre-removal increments and any in-flight Observe writers // hitting the prior table snapshot would be silently lost. if !bucketStillReferenced(next.virtualForRoute, bucket) { s.retiredSlots = append(s.retiredSlots, retiredSlot{ - slot: bucket, remaining: retainedFlushes, + slot: bucket, retiredAt: retiredAt, }) } s.retiredMu.Unlock() @@ -462,9 +483,10 @@ func (s *MemSampler) Flush() { col.Rows = appendDrainedRow(col.Rows, slot) } + grace := s.graceWindow() s.retiredMu.Lock() - s.retiredSlots = drainRetiredSlots(s.retiredSlots, &col.Rows) - s.pendingPrunes = advancePendingPrunes(s.pendingPrunes) + s.retiredSlots = drainRetiredSlots(s.retiredSlots, &col.Rows, col.At, grace) + s.pendingPrunes = advancePendingPrunes(s.pendingPrunes, col.At, grace) s.retiredMu.Unlock() sort.SliceStable(col.Rows, func(i, j int) bool { @@ -476,31 +498,30 @@ func (s *MemSampler) Flush() { s.historyMu.Unlock() } -// drainRetiredSlots emits a row for each retired slot, decrements the -// per-entry grace counter, and returns the surviving entries (in the -// original backing array). Rows are appended to *rows; the slice -// header is updated through the pointer so the caller observes the -// growth. -func drainRetiredSlots(retired []retiredSlot, rows *[]MatrixRow) []retiredSlot { +// drainRetiredSlots emits a row for each retired slot and returns the +// entries whose grace window has not yet elapsed. Rows are appended +// to *rows so the caller sees the slice growth. Entries whose +// elapsed time (now - retiredAt) has reached grace are dropped after +// this final drain. +func drainRetiredSlots(retired []retiredSlot, rows *[]MatrixRow, now time.Time, grace time.Duration) []retiredSlot { keep := retired[:0] for _, r := range retired { *rows = appendDrainedRow(*rows, r.slot) - r.remaining-- - if r.remaining > 0 { + if now.Sub(r.retiredAt) < grace { keep = append(keep, r) } } return keep } -// advancePendingPrunes decrements each entry's grace counter; entries -// whose counter reaches zero have their routeID actually pruned from -// the bucket's MemberRoutes. Returns the surviving entries. -func advancePendingPrunes(pending []memberPrune) []memberPrune { +// advancePendingPrunes lets each pending member-prune live until its +// retiredAt+grace passes, then actually prunes the routeID from the +// bucket's MemberRoutes. Returns the entries still inside the grace +// window. +func advancePendingPrunes(pending []memberPrune, now time.Time, grace time.Duration) []memberPrune { keep := pending[:0] for _, p := range pending { - p.remaining-- - if p.remaining > 0 { + if now.Sub(p.retiredAt) < grace { keep = append(keep, p) continue } @@ -639,9 +660,12 @@ func rebuildSorted(tbl *routeTable) []*routeSlot { return out } -// findVirtualBucket returns the existing virtual bucket whose range -// covers (or is closest to the right of) start. Returns nil when no -// virtual bucket exists yet — caller creates one in that case. +// findVirtualBucket returns the existing virtual bucket that covers +// start, preferring an exact range match. If no bucket contains +// start, returns the first aggregate in sortedSlots order so +// over-budget routes collapse into a single global bucket rather than +// fragmenting across many. Returns nil only when no virtual bucket +// exists yet — caller creates one in that case. func findVirtualBucket(sorted []*routeSlot, start []byte) *routeSlot { for _, s := range sorted { if !s.Aggregate { diff --git a/keyviz/sampler_test.go b/keyviz/sampler_test.go index a8261ac8e..f45c09639 100644 --- a/keyviz/sampler_test.go +++ b/keyviz/sampler_test.go @@ -285,29 +285,29 @@ func TestRemoveRouteHarvestsPendingCounts(t *testing.T) { // TestRemoveVirtualMemberPrunesMemberRoutes pins Codex P2: when a // coarsened (virtual-bucket) RouteID is removed, the bucket's // MemberRoutes list must eventually drop that ID. Pruning is deferred -// across retainedFlushes cycles so the row attribution stays correct -// while the bucket counters still include the removed route's +// across the wall-clock grace window so the row attribution stays +// correct while the bucket counters still include the removed route's // pre-removal increments — TestRemoveVirtualMemberPruneDeferred pins // the grace-window half of this contract. func TestRemoveVirtualMemberPrunesMemberRoutes(t *testing.T) { t.Parallel() - s := setupVirtualBucketWithThreeMembers(t) + s, clk := setupVirtualBucketWithThreeMembers(t) s.RemoveRoute(2) - for i := 0; i < retainedFlushes; i++ { - s.Observe(3, OpRead, 1, 0) - s.Flush() - } - // One more flush after the grace window to capture the post-prune - // MemberRoutes state in a row. + // Drain at least once within grace, then advance past grace so the + // next Flush actually executes the prune. One more Flush captures + // the post-prune MemberRoutes state in a row. + s.Observe(3, OpRead, 1, 0) + s.Flush() + clk.Advance(s.graceWindow() + time.Second) + s.Observe(3, OpRead, 1, 0) + s.Flush() s.Observe(3, OpRead, 1, 0) s.Flush() cols := s.Snapshot(time.Time{}, time.Time{}) agg := cols[len(cols)-1].Rows[0] - for _, m := range agg.MemberRoutes { - if m == 2 { - t.Fatalf("after grace, removed route 2 still in MemberRoutes: %v", agg.MemberRoutes) - } + if memberRoutesContain(agg.MemberRoutes, 2) { + t.Fatalf("after grace, removed route 2 still in MemberRoutes: %v", agg.MemberRoutes) } if len(agg.MemberRoutes) != 1 || agg.MemberRoutes[0] != 3 { t.Fatalf("MemberRoutes = %v, want [3]", agg.MemberRoutes) @@ -315,15 +315,17 @@ func TestRemoveVirtualMemberPrunesMemberRoutes(t *testing.T) { } // TestRemoveVirtualMemberPruneDeferred pins the deferred-prune -// contract: the removed routeID stays in MemberRoutes for the -// retainedFlushes grace window so flushed rows whose counters still -// include that route's pre-removal increments attribute the traffic -// correctly. +// contract: the removed routeID stays in MemberRoutes throughout the +// wall-clock grace window so flushed rows whose counters still include +// that route's pre-removal increments attribute the traffic correctly. func TestRemoveVirtualMemberPruneDeferred(t *testing.T) { t.Parallel() - s := setupVirtualBucketWithThreeMembers(t) + s, _ := setupVirtualBucketWithThreeMembers(t) s.RemoveRoute(2) - for i := 0; i < retainedFlushes; i++ { + // Two flushes inside the grace window — the clock is not advanced, + // so each Flush sees now-retiredAt == 0 < grace and keeps the + // prune entry. + for i := 0; i < 2; i++ { s.Observe(3, OpRead, 1, 0) s.Flush() col := lastSnapshotColumn(t, s) @@ -352,9 +354,9 @@ func memberRoutesContain(members []uint64, target uint64) bool { return false } -func setupVirtualBucketWithThreeMembers(t *testing.T) *MemSampler { +func setupVirtualBucketWithThreeMembers(t *testing.T) (*MemSampler, *fakeClock) { t.Helper() - s, _ := newTestSampler(t, MemSamplerOptions{ + s, clk := newTestSampler(t, MemSamplerOptions{ Step: time.Second, HistoryColumns: 4, MaxTrackedRoutes: 1, @@ -368,7 +370,7 @@ func setupVirtualBucketWithThreeMembers(t *testing.T) *MemSampler { if s.RegisterRoute(3, []byte("e"), []byte("g")) { t.Fatal("route 3 should fold (over budget)") } - return s + return s, clk } // TestRegisterDoesNotRaceFlushOnVirtualBucket pins Codex P1: folding @@ -538,15 +540,15 @@ func TestFlushSortsMixedLiveAndRetiredRows(t *testing.T) { } // TestRetiredSlotGracePeriod asserts that a retired slot stays in the -// drain queue for `retainedFlushes` cycles so an Observe goroutine +// drain queue for the wall-clock grace window so an Observe goroutine // that loaded the pre-RemoveRoute table snapshot can complete its // atomic.Add into the slot and still have the increment harvested by // a subsequent Flush. We simulate the late writer by reaching into -// the retired-slot queue directly and bumping the counter between the -// two flushes. +// the retired-slot queue directly and bumping the counter between +// flushes inside the grace window. func TestRetiredSlotGracePeriod(t *testing.T) { t.Parallel() - s, _ := newTestSampler(t, MemSamplerOptions{Step: time.Second, HistoryColumns: 8}) + s, clk := newTestSampler(t, MemSamplerOptions{Step: time.Second, HistoryColumns: 8}) if !s.RegisterRoute(1, []byte("a"), []byte("b")) { t.Fatal("RegisterRoute(1) returned false") } @@ -554,15 +556,17 @@ func TestRetiredSlotGracePeriod(t *testing.T) { s.RemoveRoute(1) lateSlot := graceQueueSingleSlot(t, s) - s.Flush() + s.Flush() // drain inside grace lateSlot.reads.Add(7) - s.Flush() + s.Flush() // drain inside grace, harvests the late-writer increment total := totalReadsForRoute(s.Snapshot(time.Time{}, time.Time{}), 1) if total != 1+7 { t.Fatalf("expected reads 8 (pre-remove + late-writer), got %d", total) } + clk.Advance(s.graceWindow() + time.Second) + s.Flush() // last drain, retiredAt+grace now passed → entry dropped s.retiredMu.Lock() leftover := len(s.retiredSlots) s.retiredMu.Unlock() @@ -682,3 +686,32 @@ func TestSnapshotReturnsDeepCopy(t *testing.T) { t.Fatalf("snapshot bounds aliased live state: start=%q end=%q", r.Start, r.End) } } + +// BenchmarkObserveHit pins the hot-path properties claimed in the +// package doc: a single atomic.Pointer.Load, a map lookup, and at +// most two atomic.AddUint64 calls — no allocation, no mutex. Use +// `go test -bench=BenchmarkObserveHit -benchmem ./keyviz/...` to +// catch regressions before they reach the coordinator wiring PR. +func BenchmarkObserveHit(b *testing.B) { + s := NewMemSampler(MemSamplerOptions{Step: time.Second, HistoryColumns: 4}) + if !s.RegisterRoute(1, []byte("a"), []byte("b")) { + b.Fatal("RegisterRoute(1) returned false") + } + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + s.Observe(1, OpRead, 16, 64) + } +} + +// BenchmarkObserveMiss exercises the unknown-route path so a future +// regression that grows allocations on misses (e.g. virtualForRoute +// fallback path) is caught. +func BenchmarkObserveMiss(b *testing.B) { + s := NewMemSampler(MemSamplerOptions{Step: time.Second, HistoryColumns: 4}) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + s.Observe(99, OpRead, 16, 64) + } +} From f9f7d59ac9d2b8cddcf153b60ea2131bfe4258b2 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sun, 26 Apr 2026 02:11:53 +0900 Subject: [PATCH 08/13] keyviz: cap bucket MemberRoutes + clear retired tails MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex round-7 P2 #1: foldIntoBucket appended every over-budget route ID to bucket.MemberRoutes with no bound. Per-column payload size scaled with total folded routes — defeating the MaxTrackedRoutes cap when total routes greatly exceed it. Add MaxMemberRoutesPerSlot (default 256, configurable via MemSamplerOptions). Past the cap the bucket counters still absorb the route's traffic, but the routeID is not appended. TestMemberRoutesCappedAtConfiguredCap pins both halves. Codex round-7 P2 #2: drainRetiredSlots and advancePendingPrunes compacted via keep := s[:0]; the dropped tail of the backing array retained *routeSlot / *bucket pointers, keeping released slots GC-reachable through the reused capacity. Add clearTail / clearPruneTail helpers that zero the dropped tail before returning. Regression test TestRetiredTailClearedAfterDrop holds a header into the original backing array and verifies index 0 is nilled after the drop. --- keyviz/sampler.go | 55 +++++++++++++++++++++++++++++----- keyviz/sampler_test.go | 67 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 114 insertions(+), 8 deletions(-) diff --git a/keyviz/sampler.go b/keyviz/sampler.go index 19f18ed8e..f6eff658e 100644 --- a/keyviz/sampler.go +++ b/keyviz/sampler.go @@ -67,9 +67,10 @@ type Sampler interface { // Defaults for MemSamplerOptions when fields are left zero. const ( - DefaultStep = 60 * time.Second - DefaultHistoryColumns = 1440 // 24 hours at 60s steps. - DefaultMaxTrackedRoutes = 10_000 + DefaultStep = 60 * time.Second + DefaultHistoryColumns = 1440 // 24 hours at 60s steps. + DefaultMaxTrackedRoutes = 10_000 + DefaultMaxMemberRoutesPerSlot = 256 ) // MemSamplerOptions configures NewMemSampler. Zero values fall back to @@ -86,6 +87,14 @@ type MemSamplerOptions struct { // returns false past this cap, the route ID maps into a virtual // bucket, and Snapshot reports it with Aggregate=true. MaxTrackedRoutes int + // MaxMemberRoutesPerSlot caps how many distinct RouteIDs a single + // virtual bucket records in MemberRoutes. Beyond this cap the + // route still folds into the bucket counters (so traffic is not + // dropped) but the routeID is not appended — keeping per-column + // payload size bounded when total routes far exceed + // MaxTrackedRoutes. Snapshot consumers should treat the list as + // "first N members" rather than authoritative attribution. + MaxMemberRoutesPerSlot int // Now overrides time.Now for tests; nil falls back to time.Now. Now func() time.Time } @@ -267,6 +276,9 @@ func NewMemSampler(opts MemSamplerOptions) *MemSampler { if opts.MaxTrackedRoutes == 0 { opts.MaxTrackedRoutes = DefaultMaxTrackedRoutes } + if opts.MaxMemberRoutesPerSlot <= 0 { + opts.MaxMemberRoutesPerSlot = DefaultMaxMemberRoutesPerSlot + } now := opts.Now if now == nil { now = time.Now @@ -371,7 +383,7 @@ func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte) bool { } next.sortedSlots = appendSorted(next.sortedSlots, bucket) } else { - foldIntoBucket(next, bucket, routeID, start, end) + s.foldIntoBucket(next, bucket, routeID, start, end) } next.virtualForRoute[routeID] = bucket s.table.Store(next) @@ -385,9 +397,15 @@ func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte) bool { // Counters live next to the metadata but are protected by their own // atomic ops, not metaMu. If Start is lowered, sortedSlots is rebuilt // to preserve Flush's key-order contract. -func foldIntoBucket(next *routeTable, bucket *routeSlot, routeID uint64, start, end []byte) { +// +// MemberRoutes growth is capped by MaxMemberRoutesPerSlot — beyond +// that cap the bucket counters still absorb the route's traffic, but +// the routeID is not added to the visible member list. +func (s *MemSampler) foldIntoBucket(next *routeTable, bucket *routeSlot, routeID uint64, start, end []byte) { bucket.metaMu.Lock() - bucket.MemberRoutes = append(bucket.MemberRoutes, routeID) + if len(bucket.MemberRoutes) < s.opts.MaxMemberRoutesPerSlot { + bucket.MemberRoutes = append(bucket.MemberRoutes, routeID) + } if len(end) == 0 || (len(bucket.End) != 0 && bytesGT(end, bucket.End)) { bucket.End = cloneBytes(end) } @@ -502,7 +520,9 @@ func (s *MemSampler) Flush() { // entries whose grace window has not yet elapsed. Rows are appended // to *rows so the caller sees the slice growth. Entries whose // elapsed time (now - retiredAt) has reached grace are dropped after -// this final drain. +// this final drain. The dropped tail of the backing array is zeroed +// so released *routeSlot pointers do not stay GC-reachable through +// the reused capacity. func drainRetiredSlots(retired []retiredSlot, rows *[]MatrixRow, now time.Time, grace time.Duration) []retiredSlot { keep := retired[:0] for _, r := range retired { @@ -511,13 +531,15 @@ func drainRetiredSlots(retired []retiredSlot, rows *[]MatrixRow, now time.Time, keep = append(keep, r) } } + clearTail(retired, len(keep)) return keep } // advancePendingPrunes lets each pending member-prune live until its // retiredAt+grace passes, then actually prunes the routeID from the // bucket's MemberRoutes. Returns the entries still inside the grace -// window. +// window. Like drainRetiredSlots, the dropped tail is zeroed so +// released bucket pointers don't linger via the reused capacity. func advancePendingPrunes(pending []memberPrune, now time.Time, grace time.Duration) []memberPrune { keep := pending[:0] for _, p := range pending { @@ -527,9 +549,26 @@ func advancePendingPrunes(pending []memberPrune, now time.Time, grace time.Durat } pruneMemberRoute(p.bucket, p.routeID) } + clearPruneTail(pending, len(keep)) return keep } +// clearTail zeroes the [keepLen, len(s)) range of s so dropped entries +// don't keep their *routeSlot pointers GC-reachable through the +// reused backing array. +func clearTail(s []retiredSlot, keepLen int) { + for i := keepLen; i < len(s); i++ { + s[i] = retiredSlot{} + } +} + +// clearPruneTail is clearTail for the pendingPrunes queue. +func clearPruneTail(s []memberPrune, keepLen int) { + for i := keepLen; i < len(s); i++ { + s[i] = memberPrune{} + } +} + // bucketStillReferenced reports whether any RouteID in // virtualForRoute still maps to bucket. Used by RemoveRoute to detect // when a virtual bucket has lost its last member and must be retired diff --git a/keyviz/sampler_test.go b/keyviz/sampler_test.go index f45c09639..7e35fa4e9 100644 --- a/keyviz/sampler_test.go +++ b/keyviz/sampler_test.go @@ -687,6 +687,73 @@ func TestSnapshotReturnsDeepCopy(t *testing.T) { } } +// TestMemberRoutesCappedAtConfiguredCap pins Codex round-7 P2: per- +// bucket MemberRoutes growth is bounded by MaxMemberRoutesPerSlot, so +// flushed columns don't scale with total folded routes when the +// deployment route count vastly exceeds MaxTrackedRoutes. +func TestMemberRoutesCappedAtConfiguredCap(t *testing.T) { + t.Parallel() + s, _ := newTestSampler(t, MemSamplerOptions{ + Step: time.Second, + HistoryColumns: 4, + MaxTrackedRoutes: 1, + MaxMemberRoutesPerSlot: 3, + }) + mustRegister(t, s, 1, "a", "b") + for i := uint64(2); i < 10; i++ { + key := []byte{byte('a' + i)} + if s.RegisterRoute(i, key, append(key, 'z')) { + t.Fatalf("route %d should fold (over budget)", i) + } + s.Observe(i, OpRead, 1, 0) + } + s.Flush() + rows := lastSnapshotColumn(t, s).Rows + agg := findAggregateRow(t, rows) + if len(agg.MemberRoutes) > 3 { + t.Fatalf("MemberRoutes exceeds cap=3: %v", agg.MemberRoutes) + } + // All 8 over-budget routes still drove the bucket counters even + // though only the first 3 are recorded as members. + if agg.Reads != 8 { + t.Fatalf("bucket Reads = %d, want 8 (counters must absorb traffic past cap)", agg.Reads) + } +} + +// TestRetiredTailClearedAfterDrop pins Codex round-7 P2: after a +// retired slot's grace expires and drainRetiredSlots drops the entry, +// the *routeSlot pointer in the dropped tail of the backing array +// must be zeroed so it doesn't keep the slot GC-reachable through +// the reused capacity. +func TestRetiredTailClearedAfterDrop(t *testing.T) { + t.Parallel() + s, clk := newTestSampler(t, MemSamplerOptions{Step: time.Second, HistoryColumns: 4}) + mustRegister(t, s, 1, "a", "b") + s.Observe(1, OpRead, 1, 1) + s.RemoveRoute(1) + + s.retiredMu.Lock() + if len(s.retiredSlots) != 1 { + s.retiredMu.Unlock() + t.Fatalf("expected 1 retired slot pre-flush, got %d", len(s.retiredSlots)) + } + orig := s.retiredSlots[:1:1] // slice header pinning index 0 of the backing array + s.retiredMu.Unlock() + + clk.Advance(s.graceWindow() + time.Second) + s.Flush() + + s.retiredMu.Lock() + leftover := len(s.retiredSlots) + s.retiredMu.Unlock() + if leftover != 0 { + t.Fatalf("expected drain to drop entry, len=%d", leftover) + } + if orig[0].slot != nil { + t.Fatal("dropped retiredSlot.slot pointer not zeroed in backing array — GC retention leak") + } +} + // BenchmarkObserveHit pins the hot-path properties claimed in the // package doc: a single atomic.Pointer.Load, a map lookup, and at // most two atomic.AddUint64 calls — no allocation, no mutex. Use From 0f20eaefad9f1bd5ec9ffca8dd60e70ea8b578b6 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sun, 26 Apr 2026 02:19:39 +0900 Subject: [PATCH 09/13] keyviz: clamp non-positive MaxTrackedRoutes + Step accessor test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex round-8 P2: NewMemSampler only defaulted MaxTrackedRoutes when the input was exactly 0. A negative configured value (e.g. via a CLI flag/env typo) passed through untouched, and RegisterRoute's `len(next.slots) < s.opts.MaxTrackedRoutes` then evaluated false for every route, silently forcing all traffic into virtual buckets and losing per-route fidelity. Switch to <=0 like HistoryColumns and MaxMemberRoutesPerSlot. Add tests: - TestNonPositiveOptionsFallBackToDefaults — sweeps 0, -1, and a large negative through HistoryColumns / MaxTrackedRoutes / MaxMemberRoutesPerSlot and confirms each lands on its Default* constant. - TestStepAccessor — covers the previously-untested Step() accessor including the typed-nil fallback to DefaultStep. --- keyviz/sampler.go | 2 +- keyviz/sampler_test.go | 42 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/keyviz/sampler.go b/keyviz/sampler.go index f6eff658e..bb206905a 100644 --- a/keyviz/sampler.go +++ b/keyviz/sampler.go @@ -273,7 +273,7 @@ func NewMemSampler(opts MemSamplerOptions) *MemSampler { if opts.HistoryColumns <= 0 { opts.HistoryColumns = DefaultHistoryColumns } - if opts.MaxTrackedRoutes == 0 { + if opts.MaxTrackedRoutes <= 0 { opts.MaxTrackedRoutes = DefaultMaxTrackedRoutes } if opts.MaxMemberRoutesPerSlot <= 0 { diff --git a/keyviz/sampler_test.go b/keyviz/sampler_test.go index 7e35fa4e9..055eb7e65 100644 --- a/keyviz/sampler_test.go +++ b/keyviz/sampler_test.go @@ -687,6 +687,48 @@ func TestSnapshotReturnsDeepCopy(t *testing.T) { } } +// TestNonPositiveOptionsFallBackToDefaults pins Codex round-8 P2: a +// negative MaxTrackedRoutes used to bypass the zero-check and force +// every route into a virtual bucket. Confirm both zero and negative +// inputs land on the documented defaults so a bad CLI/env value +// doesn't silently destroy keyviz fidelity. +func TestNonPositiveOptionsFallBackToDefaults(t *testing.T) { + t.Parallel() + for _, val := range []int{0, -1, -10_000} { + s, _ := newTestSampler(t, MemSamplerOptions{ + Step: time.Second, + HistoryColumns: val, + MaxTrackedRoutes: val, + MaxMemberRoutesPerSlot: val, + }) + if s.opts.HistoryColumns != DefaultHistoryColumns { + t.Fatalf("HistoryColumns=%d → %d, want default %d", val, s.opts.HistoryColumns, DefaultHistoryColumns) + } + if s.opts.MaxTrackedRoutes != DefaultMaxTrackedRoutes { + t.Fatalf("MaxTrackedRoutes=%d → %d, want default %d", val, s.opts.MaxTrackedRoutes, DefaultMaxTrackedRoutes) + } + if s.opts.MaxMemberRoutesPerSlot != DefaultMaxMemberRoutesPerSlot { + t.Fatalf("MaxMemberRoutesPerSlot=%d → %d, want default %d", val, s.opts.MaxMemberRoutesPerSlot, DefaultMaxMemberRoutesPerSlot) + } + } +} + +// TestStepAccessor pins the Step() accessor contract: returns the +// configured Step (after defaulting) for a constructed sampler, and +// returns DefaultStep for a typed nil so callers wiring RunFlusher +// against a disabled sampler don't crash. +func TestStepAccessor(t *testing.T) { + t.Parallel() + s, _ := newTestSampler(t, MemSamplerOptions{Step: 250 * time.Millisecond}) + if got := s.Step(); got != 250*time.Millisecond { + t.Fatalf("Step() = %v, want 250ms", got) + } + var nilSampler *MemSampler + if got := nilSampler.Step(); got != DefaultStep { + t.Fatalf("nil Step() = %v, want DefaultStep %v", got, DefaultStep) + } +} + // TestMemberRoutesCappedAtConfiguredCap pins Codex round-7 P2: per- // bucket MemberRoutes growth is bounded by MaxMemberRoutesPerSlot, so // flushed columns don't scale with total folded routes when the From 541bc92dd7f7d35d359cc1da3b7617c088e04513 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sun, 26 Apr 2026 02:30:28 +0900 Subject: [PATCH 10/13] keyviz: cancel pending member-prune on re-registration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex round-9 P2: RegisterRoute remapped a previously removed virtual routeID back into a bucket but never cleared the queued pendingPrunes entry for the same routeID. After grace, advancePendingPrunes still ran the prune, so MemberRoutes lost a route that Observe was actively counting against the bucket — silent under-attribution during route-churn / rebalancing where the same RouteID can disappear and reappear within the grace window. Cancel any pendingPrune with a matching routeID at the top of RegisterRoute (under routesMu, briefly grabbing retiredMu). Add a defensive dedup in foldIntoBucket so a re-fold with the routeID still present in MemberRoutes does not produce a duplicate entry. TestReRegisterDuringPruneGraceCancelsPrune drives Remove → Register → Advance(graceWindow + 1s) → Flush and asserts the routeID is present in MemberRoutes exactly once. --- keyviz/sampler.go | 43 +++++++++++++++++++++++++++++++++++++++++- keyviz/sampler_test.go | 41 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+), 1 deletion(-) diff --git a/keyviz/sampler.go b/keyviz/sampler.go index bb206905a..1ab71a6cc 100644 --- a/keyviz/sampler.go +++ b/keyviz/sampler.go @@ -342,6 +342,13 @@ func (s *MemSampler) Observe(routeID uint64, op Op, keyLen, valueLen int) { // MaxTrackedRoutes cap was hit and the route was folded into a // virtual aggregate bucket. Idempotent: calling twice with the same // RouteID is a no-op (the original slot stays in place). +// +// If a previous RemoveRoute(routeID) queued a deferred member-prune +// for this same RouteID and the grace window has not elapsed yet, +// that prune is cancelled here — otherwise re-registering during +// route churn would still see the routeID disappear from +// bucket.MemberRoutes once grace expires, despite Observe attributing +// fresh traffic to that ID. func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte) bool { if s == nil { return false @@ -349,6 +356,8 @@ func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte) bool { s.routesMu.Lock() defer s.routesMu.Unlock() + s.cancelPendingPrune(routeID) + cur := s.table.Load() if _, ok := cur.slots[routeID]; ok { return true @@ -403,7 +412,8 @@ func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte) bool { // the routeID is not added to the visible member list. func (s *MemSampler) foldIntoBucket(next *routeTable, bucket *routeSlot, routeID uint64, start, end []byte) { bucket.metaMu.Lock() - if len(bucket.MemberRoutes) < s.opts.MaxMemberRoutesPerSlot { + if !memberRoutesContains(bucket.MemberRoutes, routeID) && + len(bucket.MemberRoutes) < s.opts.MaxMemberRoutesPerSlot { bucket.MemberRoutes = append(bucket.MemberRoutes, routeID) } if len(end) == 0 || (len(bucket.End) != 0 && bytesGT(end, bucket.End)) { @@ -553,6 +563,37 @@ func advancePendingPrunes(pending []memberPrune, now time.Time, grace time.Durat return keep } +// cancelPendingPrune drops any pendingPrune entries matching routeID, +// stopping a deferred prune from executing after the route has been +// re-registered inside the grace window. Holds retiredMu briefly off +// the hot path; callers must already hold routesMu so the +// cancellation pairs atomically with the route-table mutation. +func (s *MemSampler) cancelPendingPrune(routeID uint64) { + s.retiredMu.Lock() + defer s.retiredMu.Unlock() + keep := s.pendingPrunes[:0] + for _, p := range s.pendingPrunes { + if p.routeID == routeID { + continue + } + keep = append(keep, p) + } + clearPruneTail(s.pendingPrunes, len(keep)) + s.pendingPrunes = keep +} + +// memberRoutesContains reports whether routeID is already listed in +// members. Used as a dedup guard so re-registering a routeID inside +// the prune grace window doesn't add a duplicate MemberRoutes entry. +func memberRoutesContains(members []uint64, routeID uint64) bool { + for _, m := range members { + if m == routeID { + return true + } + } + return false +} + // clearTail zeroes the [keepLen, len(s)) range of s so dropped entries // don't keep their *routeSlot pointers GC-reachable through the // reused backing array. diff --git a/keyviz/sampler_test.go b/keyviz/sampler_test.go index 055eb7e65..c6fb20525 100644 --- a/keyviz/sampler_test.go +++ b/keyviz/sampler_test.go @@ -687,6 +687,47 @@ func TestSnapshotReturnsDeepCopy(t *testing.T) { } } +// TestReRegisterDuringPruneGraceCancelsPrune pins Codex round-9 P2: +// a virtual-member route that gets removed and re-registered inside +// the prune grace window must not have its routeID stripped from +// bucket.MemberRoutes when grace expires — the route is alive again +// and Observe is feeding fresh traffic into the bucket. +func TestReRegisterDuringPruneGraceCancelsPrune(t *testing.T) { + t.Parallel() + s, clk := newTestSampler(t, MemSamplerOptions{ + Step: time.Second, + HistoryColumns: 4, + MaxTrackedRoutes: 1, + }) + mustRegister(t, s, 1, "a", "b") + if s.RegisterRoute(2, []byte("c"), []byte("d")) { + t.Fatal("route 2 should fold into virtual bucket") + } + s.RemoveRoute(2) + if s.RegisterRoute(2, []byte("c"), []byte("d")) { + t.Fatal("route 2 should still fold (over budget)") + } + + clk.Advance(s.graceWindow() + time.Second) + s.Observe(2, OpRead, 0, 0) + s.Flush() + + rows := lastSnapshotColumn(t, s).Rows + agg := findAggregateRow(t, rows) + if !memberRoutesContain(agg.MemberRoutes, 2) { + t.Fatalf("re-registered route 2 dropped from MemberRoutes after grace: %v", agg.MemberRoutes) + } + count := 0 + for _, m := range agg.MemberRoutes { + if m == 2 { + count++ + } + } + if count != 1 { + t.Fatalf("MemberRoutes contains route 2 %d times, want exactly 1: %v", count, agg.MemberRoutes) + } +} + // TestNonPositiveOptionsFallBackToDefaults pins Codex round-8 P2: a // negative MaxTrackedRoutes used to bypass the zero-check and force // every route into a virtual bucket. Confirm both zero and negative From 41ab078d146d79c0564766a2cffc8227828ae310 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sun, 26 Apr 2026 02:34:21 +0900 Subject: [PATCH 11/13] keyviz: stdlib bytes.Compare + doc/test cleanup Round-10 CodeRabbit nits (all non-blocking polish on top of round-9): - Replace hand-rolled bytesLT/LE/GT/GE with bytes.Compare wrappers so we share the asm-optimized stdlib path and shrink four comparators to one-liners. - RegisterRoute coarsening comment said "closest-by-start virtual bucket"; the actual rule is "first bucket containing start, else first aggregate in sortedSlots order." Rewrite to match findVirtualBucket's selection. - TestRemoveVirtualMemberPrunesMemberRoutes used cols[len(cols)-1].Rows[0]; switch to findAggregateRow so a future setup change cant silently shift the assertion onto the wrong row. - Rename setupVirtualBucketWithThreeMembers (which actually creates one individual + a 2-member bucket) to setupOneIndividualPlusVirtualBucket so callers don't get misled. --- keyviz/sampler.go | 24 ++++++++++-------------- keyviz/sampler_test.go | 9 ++++----- 2 files changed, 14 insertions(+), 19 deletions(-) diff --git a/keyviz/sampler.go b/keyviz/sampler.go index 1ab71a6cc..9e17a5d17 100644 --- a/keyviz/sampler.go +++ b/keyviz/sampler.go @@ -30,6 +30,7 @@ package keyviz import ( + "bytes" "sort" "sync" "sync/atomic" @@ -379,8 +380,11 @@ func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte) bool { return true } - // Coarsening: route is folded into the closest-by-start virtual - // bucket (or one is created if no virtual bucket exists yet). + // Coarsening: prefer the first virtual bucket whose [Start, End) + // covers `start`; if none does, fall back to the first aggregate + // in sortedSlots order (lowest Start). A new bucket is created + // only when no virtual bucket exists yet. See findVirtualBucket + // for the exact selection. bucket := findVirtualBucket(next.sortedSlots, start) if bucket == nil { bucket = &routeSlot{ @@ -772,15 +776,7 @@ func cloneBytes(b []byte) []byte { return out } -func bytesLT(a, b []byte) bool { - for i := 0; i < len(a) && i < len(b); i++ { - if a[i] != b[i] { - return a[i] < b[i] - } - } - return len(a) < len(b) -} - -func bytesLE(a, b []byte) bool { return !bytesGT(a, b) } -func bytesGE(a, b []byte) bool { return !bytesLT(a, b) } -func bytesGT(a, b []byte) bool { return bytesLT(b, a) } +func bytesLT(a, b []byte) bool { return bytes.Compare(a, b) < 0 } +func bytesLE(a, b []byte) bool { return bytes.Compare(a, b) <= 0 } +func bytesGT(a, b []byte) bool { return bytes.Compare(a, b) > 0 } +func bytesGE(a, b []byte) bool { return bytes.Compare(a, b) >= 0 } diff --git a/keyviz/sampler_test.go b/keyviz/sampler_test.go index c6fb20525..5a209ee85 100644 --- a/keyviz/sampler_test.go +++ b/keyviz/sampler_test.go @@ -291,7 +291,7 @@ func TestRemoveRouteHarvestsPendingCounts(t *testing.T) { // the grace-window half of this contract. func TestRemoveVirtualMemberPrunesMemberRoutes(t *testing.T) { t.Parallel() - s, clk := setupVirtualBucketWithThreeMembers(t) + s, clk := setupOneIndividualPlusVirtualBucket(t) s.RemoveRoute(2) // Drain at least once within grace, then advance past grace so the // next Flush actually executes the prune. One more Flush captures @@ -304,8 +304,7 @@ func TestRemoveVirtualMemberPrunesMemberRoutes(t *testing.T) { s.Observe(3, OpRead, 1, 0) s.Flush() - cols := s.Snapshot(time.Time{}, time.Time{}) - agg := cols[len(cols)-1].Rows[0] + agg := findAggregateRow(t, lastSnapshotColumn(t, s).Rows) if memberRoutesContain(agg.MemberRoutes, 2) { t.Fatalf("after grace, removed route 2 still in MemberRoutes: %v", agg.MemberRoutes) } @@ -320,7 +319,7 @@ func TestRemoveVirtualMemberPrunesMemberRoutes(t *testing.T) { // that route's pre-removal increments attribute the traffic correctly. func TestRemoveVirtualMemberPruneDeferred(t *testing.T) { t.Parallel() - s, _ := setupVirtualBucketWithThreeMembers(t) + s, _ := setupOneIndividualPlusVirtualBucket(t) s.RemoveRoute(2) // Two flushes inside the grace window — the clock is not advanced, // so each Flush sees now-retiredAt == 0 < grace and keeps the @@ -354,7 +353,7 @@ func memberRoutesContain(members []uint64, target uint64) bool { return false } -func setupVirtualBucketWithThreeMembers(t *testing.T) (*MemSampler, *fakeClock) { +func setupOneIndividualPlusVirtualBucket(t *testing.T) (*MemSampler, *fakeClock) { t.Helper() s, clk := newTestSampler(t, MemSamplerOptions{ Step: time.Second, From 25061d68766d78677512d74df1475c13ddb23656 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sun, 26 Apr 2026 02:44:53 +0900 Subject: [PATCH 12/13] keyviz: synthetic virtual bucket IDs + scoped prune cancel MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex round-11 P2 #1: cancelPendingPrune ran unconditionally at the top of RegisterRoute. When a removed virtual member rejoined as an individual slot (capacity freed up), the cancel suppressed the only cleanup path for the old bucket's MemberRoutes — leaving the routeID listed on the bucket forever despite no longer contributing traffic. Replace with cancelPendingPruneFor(bucket, routeID), called only after we know the route is rejoining the same bucket; prunes against other buckets (or where the route lands as an individual slot) fire normally and clean up the old bucket's metadata. Codex round-11 P2 #2: a freshly created virtual bucket stamped its RouteID with the first folded real route ID. If that real route was later removed and re-registered as an individual slot, flushed columns could show two rows with the same RouteID — one aggregate, one individual. Add an atomic.Uint64 virtualIDCounter on MemSampler and hand out synthetic IDs from the high end of uint64 (MaxUint64, MaxUint64-1, …). The synthetic space cannot collide with real route IDs (assigned from the low end by the coordinator), so row identity stays unambiguous under churn. Claude bot nit: rename the local `bytes` variable in Observe to `byteCount` so it no longer shadows the new `bytes` stdlib import. Tests: TestVirtualBucketRouteIDIsSynthetic + TestRejoinAsIndividualLetsBucketPruneFire. --- keyviz/sampler.go | 74 +++++++++++++++++++++++++++++------------- keyviz/sampler_test.go | 69 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 121 insertions(+), 22 deletions(-) diff --git a/keyviz/sampler.go b/keyviz/sampler.go index 9e17a5d17..22d08c19e 100644 --- a/keyviz/sampler.go +++ b/keyviz/sampler.go @@ -135,6 +135,14 @@ type MemSampler struct { retiredMu sync.Mutex retiredSlots []retiredSlot pendingPrunes []memberPrune + + // virtualIDCounter hands out synthetic RouteIDs for new virtual + // buckets, starting at MaxUint64 and decrementing. The synthetic + // space cannot collide with real route IDs (which the coordinator + // assigns from the low end), so a real RouteID can never appear + // twice in the same column — once as an aggregate row, once as + // an individual row — even under register/remove churn. + virtualIDCounter atomic.Uint64 } // retiredSlot tracks a removed slot through its post-removal grace @@ -317,23 +325,23 @@ func (s *MemSampler) Observe(routeID uint64, op Op, keyLen, valueLen int) { return } } - bytes := uint64(0) + byteCount := uint64(0) if keyLen > 0 { - bytes += uint64(keyLen) + byteCount += uint64(keyLen) } if valueLen > 0 { - bytes += uint64(valueLen) + byteCount += uint64(valueLen) } switch op { case OpRead: slot.reads.Add(1) - if bytes > 0 { - slot.readBytes.Add(bytes) + if byteCount > 0 { + slot.readBytes.Add(byteCount) } case OpWrite: slot.writes.Add(1) - if bytes > 0 { - slot.writeBytes.Add(bytes) + if byteCount > 0 { + slot.writeBytes.Add(byteCount) } } } @@ -345,11 +353,12 @@ func (s *MemSampler) Observe(routeID uint64, op Op, keyLen, valueLen int) { // RouteID is a no-op (the original slot stays in place). // // If a previous RemoveRoute(routeID) queued a deferred member-prune -// for this same RouteID and the grace window has not elapsed yet, -// that prune is cancelled here — otherwise re-registering during -// route churn would still see the routeID disappear from -// bucket.MemberRoutes once grace expires, despite Observe attributing -// fresh traffic to that ID. +// and the route is now re-registered into the SAME bucket inside the +// grace window, that prune is cancelled — otherwise the routeID +// would disappear from bucket.MemberRoutes despite Observe still +// attributing fresh traffic to it. Prunes for different buckets (or +// when the route rejoins as an individual slot) are left alone so +// the old bucket's MemberRoutes is correctly cleaned up. func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte) bool { if s == nil { return false @@ -357,8 +366,6 @@ func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte) bool { s.routesMu.Lock() defer s.routesMu.Unlock() - s.cancelPendingPrune(routeID) - cur := s.table.Load() if _, ok := cur.slots[routeID]; ok { return true @@ -388,7 +395,7 @@ func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte) bool { bucket := findVirtualBucket(next.sortedSlots, start) if bucket == nil { bucket = &routeSlot{ - RouteID: routeID, + RouteID: s.nextVirtualBucketID(), Start: cloneBytes(start), End: cloneBytes(end), Aggregate: true, @@ -398,6 +405,11 @@ func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte) bool { } else { s.foldIntoBucket(next, bucket, routeID, start, end) } + // The route is rejoining `bucket`; cancel any deferred prune for + // this same routeID against this same bucket so it stays in + // MemberRoutes. Prunes against other buckets are intentionally + // left in place. + s.cancelPendingPruneFor(bucket, routeID) next.virtualForRoute[routeID] = bucket s.table.Store(next) return false @@ -567,17 +579,23 @@ func advancePendingPrunes(pending []memberPrune, now time.Time, grace time.Durat return keep } -// cancelPendingPrune drops any pendingPrune entries matching routeID, -// stopping a deferred prune from executing after the route has been -// re-registered inside the grace window. Holds retiredMu briefly off -// the hot path; callers must already hold routesMu so the -// cancellation pairs atomically with the route-table mutation. -func (s *MemSampler) cancelPendingPrune(routeID uint64) { +// cancelPendingPruneFor drops any pendingPrune entry whose routeID +// AND bucket pointer both match — i.e. the same routeID is rejoining +// the same bucket it was just removed from. This stops a deferred +// prune from removing the routeID from MemberRoutes despite Observe +// still attributing traffic to it. Prunes against other buckets (or +// where the route rejoins as an individual slot) are left in place +// so the old bucket's MemberRoutes is correctly cleaned up. +// +// Holds retiredMu briefly off the hot path; callers must already +// hold routesMu so the cancellation pairs atomically with the +// route-table mutation. +func (s *MemSampler) cancelPendingPruneFor(bucket *routeSlot, routeID uint64) { s.retiredMu.Lock() defer s.retiredMu.Unlock() keep := s.pendingPrunes[:0] for _, p := range s.pendingPrunes { - if p.routeID == routeID { + if p.routeID == routeID && p.bucket == bucket { continue } keep = append(keep, p) @@ -586,6 +604,18 @@ func (s *MemSampler) cancelPendingPrune(routeID uint64) { s.pendingPrunes = keep } +// nextVirtualBucketID returns a synthetic RouteID for a brand-new +// virtual bucket. Synthetic IDs come from the high end of uint64 and +// decrement, so they cannot collide with real route IDs (which are +// assigned from the low end by the coordinator). Without this, +// stamping the bucket with the first folded real RouteID would mean +// that ID could later show up on TWO rows in the same column — one +// aggregate, one individual — if the original route is later +// re-registered as an individual slot. +func (s *MemSampler) nextVirtualBucketID() uint64 { + return s.virtualIDCounter.Add(^uint64(0)) // subtract 1; counter starts at MaxUint64+1 +} + // memberRoutesContains reports whether routeID is already listed in // members. Used as a dedup guard so re-registering a routeID inside // the prune grace window doesn't add a duplicate MemberRoutes entry. diff --git a/keyviz/sampler_test.go b/keyviz/sampler_test.go index 5a209ee85..622416de4 100644 --- a/keyviz/sampler_test.go +++ b/keyviz/sampler_test.go @@ -686,6 +686,75 @@ func TestSnapshotReturnsDeepCopy(t *testing.T) { } } +// TestVirtualBucketRouteIDIsSynthetic pins Codex round-11 P2: a +// virtual bucket must not stamp itself with a real route ID, or that +// real ID could later show up on TWO rows in the same column — +// once aggregate, once individual — when the original folded route +// is later re-registered as an individual slot. +func TestVirtualBucketRouteIDIsSynthetic(t *testing.T) { + t.Parallel() + s, _ := newTestSampler(t, MemSamplerOptions{ + Step: time.Second, + HistoryColumns: 4, + MaxTrackedRoutes: 1, + }) + mustRegister(t, s, 1, "a", "b") + if s.RegisterRoute(2, []byte("c"), []byte("d")) { + t.Fatal("route 2 should fold into a fresh virtual bucket") + } + s.Observe(2, OpRead, 0, 0) + s.Flush() + + rows := lastSnapshotColumn(t, s).Rows + agg := findAggregateRow(t, rows) + if agg.RouteID == 2 { + t.Fatalf("aggregate bucket reused real RouteID 2 — synthetic ID space required: %v", agg) + } + for _, r := range rows { + if !r.Aggregate && r.RouteID == agg.RouteID { + t.Fatalf("real RouteID %d collided with aggregate row %v", r.RouteID, agg) + } + } +} + +// TestRejoinAsIndividualLetsBucketPruneFire pins Codex round-11 P2: +// when a removed virtual member rejoins as an individual slot (capacity +// freed up), the deferred member-prune for the old bucket must still +// execute — otherwise the bucket's MemberRoutes keeps a route that no +// longer contributes traffic to it. +func TestRejoinAsIndividualLetsBucketPruneFire(t *testing.T) { + t.Parallel() + s, clk := newTestSampler(t, MemSamplerOptions{ + Step: time.Second, + HistoryColumns: 4, + MaxTrackedRoutes: 2, + }) + mustRegister(t, s, 1, "a", "b") + mustRegister(t, s, 2, "m", "n") + if s.RegisterRoute(3, []byte("y"), []byte("z")) { + t.Fatal("route 3 over budget should fold") + } + // Free capacity: remove route 2 (now slots have room) AND remove 3 + // (queued in pendingPrunes). Re-register 3 — it now fits as an + // individual slot, so the old bucket should still get pruned. + s.RemoveRoute(3) + s.RemoveRoute(2) + if !s.RegisterRoute(3, []byte("y"), []byte("z")) { + t.Fatal("route 3 should now fit as individual slot") + } + + clk.Advance(s.graceWindow() + time.Second) + s.Observe(3, OpRead, 0, 0) + s.Flush() + + rows := lastSnapshotColumn(t, s).Rows + for _, r := range rows { + if r.Aggregate && memberRoutesContain(r.MemberRoutes, 3) { + t.Fatalf("after rejoin-as-individual + grace, bucket still lists route 3: %+v", r) + } + } +} + // TestReRegisterDuringPruneGraceCancelsPrune pins Codex round-9 P2: // a virtual-member route that gets removed and re-registered inside // the prune grace window must not have its routeID stripped from From 4c609062271ee5d6b1fb4d519680da421b64d03d Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sun, 26 Apr 2026 02:55:15 +0900 Subject: [PATCH 13/13] keyviz: reuse retired slot on re-registration + test polish MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex round-12 P2: Re-registering a removed RouteID inside the grace window allocated a brand-new *routeSlot. Flush then drained both the new live slot AND the old retired slot in the same column, producing two rows with the same RouteID and splitting counts between them. Add reclaimRetiredSlot — when RegisterRoute lands a real (non-aggregate) routeID and a retired slot with that ID is queued, pull it off retiredSlots and reuse it. The old counters and any in-flight late-Observe writes converge with new traffic on the same slot, and Flush emits a single row. TestReRegisterIndividualReusesRetiredSlot asserts one row per RouteID per column with correctly accumulated counts. Claude bot nits: - nextVirtualBucketID comment now spells out the atomic.Uint64 wrap-around explicitly instead of the cryptic "MaxUint64+1" form. - TestRejoinAsIndividualLetsBucketPruneFire was vacuously true — the bucket had no traffic so it never emitted a row, making the prune assertion unreachable. Restructure with two virtual members (so removing one leaves the bucket alive, not orphaned), drive traffic through the surviving member, and flush twice past grace so the post-prune MemberRoutes lands in a snapshot row. --- keyviz/sampler.go | 54 ++++++++++++++++++++++++++--- keyviz/sampler_test.go | 79 +++++++++++++++++++++++++++++++++--------- 2 files changed, 112 insertions(+), 21 deletions(-) diff --git a/keyviz/sampler.go b/keyviz/sampler.go index 22d08c19e..0d816c08f 100644 --- a/keyviz/sampler.go +++ b/keyviz/sampler.go @@ -376,10 +376,25 @@ func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte) bool { next := copyRouteTable(cur) if len(next.slots) < s.opts.MaxTrackedRoutes { - slot := &routeSlot{ - RouteID: routeID, - Start: cloneBytes(start), - End: cloneBytes(end), + slot := s.reclaimRetiredSlot(routeID) + if slot == nil { + slot = &routeSlot{ + RouteID: routeID, + Start: cloneBytes(start), + End: cloneBytes(end), + } + } else { + // Re-registering the same routeID inside the grace window: + // reuse the retired slot so any in-flight Observe writers + // hitting the prior table snapshot land on the same slot + // the new traffic uses, and Flush emits a single row per + // RouteID instead of two (one live + one retired) with + // counts split across them. Refresh the metadata fields to + // match the new registration; counters are preserved. + slot.metaMu.Lock() + slot.Start = cloneBytes(start) + slot.End = cloneBytes(end) + slot.metaMu.Unlock() } next.slots[routeID] = slot next.sortedSlots = appendSorted(next.sortedSlots, slot) @@ -613,7 +628,36 @@ func (s *MemSampler) cancelPendingPruneFor(bucket *routeSlot, routeID uint64) { // aggregate, one individual — if the original route is later // re-registered as an individual slot. func (s *MemSampler) nextVirtualBucketID() uint64 { - return s.virtualIDCounter.Add(^uint64(0)) // subtract 1; counter starts at MaxUint64+1 + // atomic.Uint64 starts at 0; adding ^uint64(0) wraps to MaxUint64 + // on the first call, MaxUint64-1 on the second, etc. + return s.virtualIDCounter.Add(^uint64(0)) +} + +// reclaimRetiredSlot looks for a non-aggregate retired slot whose +// RouteID matches the supplied routeID and, if found, removes it +// from retiredSlots and returns it for reuse. This guarantees a +// route removed and re-registered inside the grace window is +// represented by a single *routeSlot — Flush would otherwise emit +// two rows with the same RouteID (one from the new live slot, one +// from the still-draining retired slot) and split counts across +// them. Aggregate (orphaned virtual bucket) entries are left in +// place because their RouteID lives in the synthetic namespace and +// a real-ID match against an aggregate would be coincidental. +func (s *MemSampler) reclaimRetiredSlot(routeID uint64) *routeSlot { + s.retiredMu.Lock() + defer s.retiredMu.Unlock() + var reclaimed *routeSlot + keep := s.retiredSlots[:0] + for _, r := range s.retiredSlots { + if reclaimed == nil && !r.slot.Aggregate && r.slot.RouteID == routeID { + reclaimed = r.slot + continue + } + keep = append(keep, r) + } + clearTail(s.retiredSlots, len(keep)) + s.retiredSlots = keep + return reclaimed } // memberRoutesContains reports whether routeID is already listed in diff --git a/keyviz/sampler_test.go b/keyviz/sampler_test.go index 622416de4..b41fbc548 100644 --- a/keyviz/sampler_test.go +++ b/keyviz/sampler_test.go @@ -718,41 +718,88 @@ func TestVirtualBucketRouteIDIsSynthetic(t *testing.T) { } // TestRejoinAsIndividualLetsBucketPruneFire pins Codex round-11 P2: -// when a removed virtual member rejoins as an individual slot (capacity -// freed up), the deferred member-prune for the old bucket must still -// execute — otherwise the bucket's MemberRoutes keeps a route that no -// longer contributes traffic to it. +// when a removed virtual member rejoins as an individual slot, the +// deferred member-prune for the old bucket must still execute — +// otherwise the bucket's MemberRoutes keeps a route that no longer +// contributes traffic to it. Set up the bucket with two members so +// removing one leaves the bucket alive (not orphaned), and traffic +// continues to be drained across the grace boundary; the post-grace +// row must show the prune actually happened. func TestRejoinAsIndividualLetsBucketPruneFire(t *testing.T) { t.Parallel() s, clk := newTestSampler(t, MemSamplerOptions{ Step: time.Second, - HistoryColumns: 4, - MaxTrackedRoutes: 2, + HistoryColumns: 8, + MaxTrackedRoutes: 1, }) mustRegister(t, s, 1, "a", "b") - mustRegister(t, s, 2, "m", "n") + if s.RegisterRoute(2, []byte("m"), []byte("n")) { + t.Fatal("route 2 should fold") + } if s.RegisterRoute(3, []byte("y"), []byte("z")) { - t.Fatal("route 3 over budget should fold") + t.Fatal("route 3 should fold (same bucket)") } - // Free capacity: remove route 2 (now slots have room) AND remove 3 - // (queued in pendingPrunes). Re-register 3 — it now fits as an - // individual slot, so the old bucket should still get pruned. + s.Observe(2, OpRead, 0, 0) + s.Observe(3, OpRead, 0, 0) + // Free capacity (route 1) and remove the virtual member (route 3), + // which queues a prune against the bucket. Route 2 still keeps the + // bucket alive in virtualForRoute. + s.RemoveRoute(1) s.RemoveRoute(3) - s.RemoveRoute(2) if !s.RegisterRoute(3, []byte("y"), []byte("z")) { - t.Fatal("route 3 should now fit as individual slot") + t.Fatal("route 3 should fit individually now") } + // Two flushes after grace: the first executes the prune, the + // second emits a row with the post-prune MemberRoutes. clk.Advance(s.graceWindow() + time.Second) - s.Observe(3, OpRead, 0, 0) + s.Observe(2, OpRead, 0, 0) + s.Flush() + s.Observe(2, OpRead, 0, 0) s.Flush() rows := lastSnapshotColumn(t, s).Rows + agg := findAggregateRow(t, rows) + if memberRoutesContain(agg.MemberRoutes, 3) { + t.Fatalf("bucket still lists pruned route 3: %v", agg.MemberRoutes) + } + if !memberRoutesContain(agg.MemberRoutes, 2) { + t.Fatalf("bucket dropped still-active route 2: %v", agg.MemberRoutes) + } +} + +// TestReRegisterIndividualReusesRetiredSlot pins Codex round-12 P2: +// re-registering the same RouteID inside the grace window must reuse +// the retired slot. Otherwise Flush emits two rows for the same +// RouteID in one column (live + retired drain), splitting counts. +func TestReRegisterIndividualReusesRetiredSlot(t *testing.T) { + t.Parallel() + s, _ := newTestSampler(t, MemSamplerOptions{ + Step: time.Second, + HistoryColumns: 4, + }) + mustRegister(t, s, 1, "a", "b") + s.Observe(1, OpRead, 0, 0) + s.RemoveRoute(1) + mustRegister(t, s, 1, "a", "b") + s.Observe(1, OpRead, 0, 0) + s.Flush() + + rows := lastSnapshotColumn(t, s).Rows + count := 0 + var total uint64 for _, r := range rows { - if r.Aggregate && memberRoutesContain(r.MemberRoutes, 3) { - t.Fatalf("after rejoin-as-individual + grace, bucket still lists route 3: %+v", r) + if r.RouteID == 1 { + count++ + total += r.Reads } } + if count != 1 { + t.Fatalf("got %d rows for RouteID 1 in one column; expected 1 — retired slot was not reclaimed", count) + } + if total != 2 { + t.Fatalf("total Reads for RouteID 1 = %d, want 2 (1 pre-remove + 1 post-rejoin)", total) + } } // TestReRegisterDuringPruneGraceCancelsPrune pins Codex round-9 P2: