diff --git a/keyviz/flusher.go b/keyviz/flusher.go new file mode 100644 index 000000000..0be239b35 --- /dev/null +++ b/keyviz/flusher.go @@ -0,0 +1,37 @@ +package keyviz + +import ( + "context" + "time" +) + +// RunFlusher drives Sampler.Flush at the supplied interval until ctx +// is cancelled. Returns when ctx fires; the final tick is not +// executed (a graceful shutdown should call Sampler.Flush once more +// after RunFlusher returns if it wants to harvest the in-progress +// step). +// +// step <= 0 falls back to DefaultStep. +// +// This is a tiny wrapper so call sites in main.go don't need to spell +// out the ticker boilerplate; testing the boilerplate is the unit test +// for this package, not for callers. +func RunFlusher(ctx context.Context, s *MemSampler, step time.Duration) { + if s == nil { + <-ctx.Done() + return + } + if step <= 0 { + step = DefaultStep + } + t := time.NewTicker(step) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + s.Flush() + } + } +} diff --git a/keyviz/flusher_test.go b/keyviz/flusher_test.go new file mode 100644 index 000000000..fcff6ab45 --- /dev/null +++ b/keyviz/flusher_test.go @@ -0,0 +1,57 @@ +package keyviz + +import ( + "context" + "testing" + "time" +) + +func TestRunFlusherTicksUntilCancel(t *testing.T) { + t.Parallel() + s := NewMemSampler(MemSamplerOptions{Step: 5 * time.Millisecond, HistoryColumns: 16}) + if !s.RegisterRoute(1, []byte("a"), []byte("b")) { + t.Fatal("Register failed") + } + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + defer close(done) + RunFlusher(ctx, s, 5*time.Millisecond) + }() + + // Drive Observe across ticker firings. + for i := 0; i < 10; i++ { + s.Observe(1, OpRead, 0, 0) + time.Sleep(2 * time.Millisecond) + } + cancel() + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("RunFlusher did not return after cancel") + } + + cols := s.Snapshot(time.Time{}, time.Time{}) + if len(cols) == 0 { + t.Fatal("expected at least one column from background flushes") + } +} + +// TestRunFlusherNilSamplerWaitsCtx asserts the nil-sampler contract +// (RunFlusher just blocks on ctx.Done so callers can hard-wire it +// regardless of whether keyviz is enabled). +func TestRunFlusherNilSamplerWaitsCtx(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + defer close(done) + RunFlusher(ctx, nil, time.Millisecond) + }() + cancel() + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("RunFlusher(nil) did not return on cancel") + } +} diff --git a/keyviz/ring_buffer.go b/keyviz/ring_buffer.go new file mode 100644 index 000000000..a3dcc6162 --- /dev/null +++ b/keyviz/ring_buffer.go @@ -0,0 +1,99 @@ +package keyviz + +import ( + "sort" + "time" +) + +// ringBuffer is a fixed-capacity circular buffer of MatrixColumn, +// oldest-first. Push drops the oldest entry once cap is reached. +// Range returns a half-open [from, to) slice in chronological order. +// +// The buffer is not goroutine-safe by itself; callers (MemSampler) +// guard it with historyMu. +type ringBuffer struct { + cap int + buf []MatrixColumn // logical-index ordered; head is the oldest + wrapped bool + pos int // next write index when wrapped == false +} + +func newRingBuffer(cap int) *ringBuffer { + if cap < 1 { + cap = 1 + } + return &ringBuffer{cap: cap, buf: make([]MatrixColumn, 0, cap)} +} + +// Push appends a column. Once length reaches cap the oldest entry is +// dropped (overwritten in place). +func (r *ringBuffer) Push(col MatrixColumn) { + if !r.wrapped { + r.buf = append(r.buf, col) + if len(r.buf) >= r.cap { + r.wrapped = true + r.pos = 0 + } + return + } + r.buf[r.pos] = col + r.pos = (r.pos + 1) % r.cap +} + +// Range returns the columns whose At falls in [from, to), oldest +// first. Either bound may be the zero Time, meaning unbounded on that +// side. The returned slice is a deep copy: each column has its own +// Rows slice and each row owns its Start/End byte slices, so callers +// may mutate any field without corrupting stored history or racing +// with concurrent flushes. +func (r *ringBuffer) Range(from, to time.Time) []MatrixColumn { + all := r.snapshotOrdered() + // snapshotOrdered is already chronologically ordered. + lo := 0 + if !from.IsZero() { + lo = sort.Search(len(all), func(i int) bool { + return !all[i].At.Before(from) + }) + } + hi := len(all) + if !to.IsZero() { + hi = sort.Search(len(all), func(i int) bool { + return !all[i].At.Before(to) + }) + } + if lo > hi { + return nil + } + out := make([]MatrixColumn, hi-lo) + for i, src := range all[lo:hi] { + out[i] = cloneColumn(src) + } + return out +} + +// cloneColumn returns a deep copy of col: a fresh Rows slice with +// each row's Start/End and MemberRoutes independently allocated. +func cloneColumn(col MatrixColumn) MatrixColumn { + rows := make([]MatrixRow, len(col.Rows)) + for i, row := range col.Rows { + rows[i] = row + rows[i].Start = cloneBytes(row.Start) + rows[i].End = cloneBytes(row.End) + if len(row.MemberRoutes) > 0 { + rows[i].MemberRoutes = append([]uint64(nil), row.MemberRoutes...) + } + } + return MatrixColumn{At: col.At, Rows: rows} +} + +// snapshotOrdered returns a chronologically ordered (oldest first) +// view of the buffer contents in a fresh slice. +func (r *ringBuffer) snapshotOrdered() []MatrixColumn { + if !r.wrapped { + return append([]MatrixColumn(nil), r.buf...) + } + out := make([]MatrixColumn, 0, r.cap) + out = append(out, r.buf[r.pos:]...) + out = append(out, r.buf[:r.pos]...) + return out +} diff --git a/keyviz/sampler.go b/keyviz/sampler.go new file mode 100644 index 000000000..0d816c08f --- /dev/null +++ b/keyviz/sampler.go @@ -0,0 +1,856 @@ +// Package keyviz implements the in-memory sampler that backs the +// docs/admin_ui_key_visualizer_design.md heatmap. +// +// The sampler sits on the data-plane hot path and counts requests per +// Raft route. The contract for callers (today, kv.ShardedCoordinator): +// +// - Construct a MemSampler with NewMemSampler. +// - Wire it through the coordinator with a constructor option; the +// coordinator calls Sampler.Observe at the dispatch entry, after +// resolving the RouteID. +// - Run Flush every StepSeconds in a background goroutine — RunFlusher +// does this for you with the supplied clock and ctx. +// - Read the rendered matrix via Snapshot for the Admin gRPC service. +// +// Hot-path properties (see design §5.1, §10): +// +// - Observe is a single atomic.Pointer[routeTable].Load, a plain map +// lookup against an immutable snapshot, and at most two +// atomic.AddUint64 calls (one for the count, one for bytes — +// skipped when both keyLen and valueLen are zero). No allocation, +// no mutex. +// - Flush drains the per-route counters with atomic.SwapUint64; no +// pointer retirement, so a late writer cannot race past the snapshot +// and lose counts. +// - Adding / removing routes (RegisterRoute, RemoveRoute today; +// ApplySplit / ApplyMerge in a future PR) builds a fresh +// routeTable copy under a non-hot-path mutex and publishes it +// with a single atomic.Pointer.Store. Routes mutated mid-step +// keep their counters in the new table by design. +package keyviz + +import ( + "bytes" + "sort" + "sync" + "sync/atomic" + "time" +) + +// Op identifies which counter family Observe should bump. +type Op uint8 + +const ( + OpRead Op = iota + OpWrite +) + +// Series selects which counter the matrix response should expose. +type Series uint8 + +const ( + SeriesReads Series = iota + SeriesWrites + SeriesReadBytes + SeriesWriteBytes +) + +// Sampler is the narrow interface the coordinator depends on. The +// nil-safe contract is documented per-method so a coordinator wired +// without a sampler compiles to a no-op call. +type Sampler interface { + // Observe records a single request against a route. Op identifies + // the counter family. keyLen and valueLen are summed into the + // matching *Bytes counter; pass 0 for read-only ops where the + // payload size is irrelevant. + Observe(routeID uint64, op Op, keyLen, valueLen int) +} + +// Defaults for MemSamplerOptions when fields are left zero. +const ( + DefaultStep = 60 * time.Second + DefaultHistoryColumns = 1440 // 24 hours at 60s steps. + DefaultMaxTrackedRoutes = 10_000 + DefaultMaxMemberRoutesPerSlot = 256 +) + +// MemSamplerOptions configures NewMemSampler. Zero values fall back to +// the Default* constants above; passing a struct literal with only the +// fields you care about is the expected call style. +type MemSamplerOptions struct { + // Step is the flush interval. The ring buffer's resolution. + Step time.Duration + // HistoryColumns caps the ring buffer length. Older columns are + // dropped on push when the buffer is full. + HistoryColumns int + // MaxTrackedRoutes caps the number of routes whose counters are + // kept individually before coarsening kicks in. RegisterRoute + // returns false past this cap, the route ID maps into a virtual + // bucket, and Snapshot reports it with Aggregate=true. + MaxTrackedRoutes int + // MaxMemberRoutesPerSlot caps how many distinct RouteIDs a single + // virtual bucket records in MemberRoutes. Beyond this cap the + // route still folds into the bucket counters (so traffic is not + // dropped) but the routeID is not appended — keeping per-column + // payload size bounded when total routes far exceed + // MaxTrackedRoutes. Snapshot consumers should treat the list as + // "first N members" rather than authoritative attribution. + MaxMemberRoutesPerSlot int + // Now overrides time.Now for tests; nil falls back to time.Now. + Now func() time.Time +} + +// MemSampler is the in-process Sampler implementation. The zero value +// is not usable — construct via NewMemSampler. +type MemSampler struct { + opts MemSamplerOptions + now func() time.Time + + // table holds the immutable map of currently-tracked routes. The + // hot path Observe() does a single Load + map lookup; mutations + // (Register / Remove / Split / Merge) take routesMu, copy, mutate + // the copy, then atomic-store the new pointer. + table atomic.Pointer[routeTable] + + // routesMu serialises the routeTable copy-on-write update path. + // Held only by non-hot-path callers. + routesMu sync.Mutex + + // historyMu guards the ring buffer. Reads (Snapshot) and writes + // (Flush) acquire it; Observe never touches it. + historyMu sync.Mutex + history *ringBuffer + + // retiredSlots holds slots that RemoveRoute removed from the live + // table. Each entry is drained for `remaining` Flushes — a grace + // period that lets late Observe writers (which loaded the route + // table before RemoveRoute's atomic.Store) finish their Add into + // the slot before we forget about it. retainedFlushes controls the + // length of that window. pendingPrunes is the same idea for + // virtual-bucket member-route removals: the routeID stays in + // MemberRoutes during the grace window so the row attribution + // stays correct while the bucket counters still include the + // removed route's pre-removal increments. Both are guarded by + // retiredMu since they are only touched off the hot path. + retiredMu sync.Mutex + retiredSlots []retiredSlot + pendingPrunes []memberPrune + + // virtualIDCounter hands out synthetic RouteIDs for new virtual + // buckets, starting at MaxUint64 and decrementing. The synthetic + // space cannot collide with real route IDs (which the coordinator + // assigns from the low end), so a real RouteID can never appear + // twice in the same column — once as an aggregate row, once as + // an individual row — even under register/remove churn. + virtualIDCounter atomic.Uint64 +} + +// retiredSlot tracks a removed slot through its post-removal grace +// period. retiredAt is the wall-clock instant of removal; the entry +// is drained on every Flush whose `now()` is within the configured +// grace window of retiredAt, then dropped. +type retiredSlot struct { + slot *routeSlot + retiredAt time.Time +} + +// memberPrune tracks a deferred virtual-bucket member-route removal. +// retiredAt is the wall-clock instant of removal; while now() is +// within the grace window, the routeID stays in bucket.MemberRoutes so +// flushed rows continue attributing the bucket's mixed counters to all +// members that contributed (including this one). +type memberPrune struct { + bucket *routeSlot + routeID uint64 + retiredAt time.Time +} + +// minGraceWindow is the wall-clock floor for late-writer grace. It +// guards against Step being configured small enough (millisecond +// scale) that a flush-count-based grace window would expire before a +// preempted Observe goroutine resumes. Any goroutine still holding a +// pre-RemoveRoute table snapshot must finish its single atomic.Add +// before now-retiredAt exceeds this window. +const minGraceWindow = 5 * time.Second + +// graceStepMultiplier is how many Step intervals we want the grace +// window to cover by default — picked so retired slots are drained +// at least twice before being dropped. +const graceStepMultiplier = 2 + +// graceWindow is the duration retired slots and pending member-prunes +// stay drainable after RemoveRoute. It's tied to wall-clock time, not +// Flush count, so a small Step doesn't shrink the grace below +// minGraceWindow. +func (s *MemSampler) graceWindow() time.Duration { + g := graceStepMultiplier * s.opts.Step + if g < minGraceWindow { + g = minGraceWindow + } + return g +} + +// routeTable is the COW snapshot Observe operates on. Once published +// via MemSampler.table.Store, fields are read-only. +type routeTable struct { + // slots is the live route → slot map. Lookups under the hot path + // run against this snapshot. + slots map[uint64]*routeSlot + // virtualForRoute maps a real RouteID that didn't fit under + // MaxTrackedRoutes to its virtual-bucket slot. Observe consults + // this fallback when slots[routeID] is missing. + virtualForRoute map[uint64]*routeSlot + // sortedSlots is the union of slots and virtualForRoute values + // sorted by Start, used by Snapshot for stable row ordering. + sortedSlots []*routeSlot +} + +// routeSlot owns the atomic counters for a tracked route or virtual +// bucket. Counter fields are touched by Observe (atomic.AddUint64) and +// Flush (atomic.SwapUint64); the metadata (RouteID, Start, End, +// Aggregate, MemberRoutes) for an individual route slot is immutable +// after the slot is published. +// +// Virtual buckets are the exception: when a new RouteID over the +// MaxTrackedRoutes cap folds into an existing bucket, RegisterRoute +// extends MemberRoutes and may grow Start/End. metaMu serialises +// those updates against Flush's read-side iteration so a concurrent +// Flush cannot observe a half-extended slice. Observe never reads +// these fields, so the hot path remains lockless. +type routeSlot struct { + metaMu sync.RWMutex + RouteID uint64 + Start []byte + End []byte + // Aggregate marks virtual buckets that fold multiple coarsened + // routes together (Snapshot surfaces this in MatrixRow). + Aggregate bool + MemberRoutes []uint64 + + reads atomic.Uint64 + writes atomic.Uint64 + readBytes atomic.Uint64 + writeBytes atomic.Uint64 +} + +// snapshotMeta returns a defensive copy of the slot's metadata under +// the read lock. Used by Flush so the row it emits doesn't share +// Start/End/MemberRoutes with the live slot (which a later +// RegisterRoute may extend, and which the snapshot API exports to +// external consumers that may mutate the bounds). +func (s *routeSlot) snapshotMeta() (start, end []byte, aggregate bool, members []uint64) { + s.metaMu.RLock() + defer s.metaMu.RUnlock() + start = cloneBytes(s.Start) + end = cloneBytes(s.End) + aggregate = s.Aggregate + if len(s.MemberRoutes) > 0 { + members = append([]uint64(nil), s.MemberRoutes...) + } + return +} + +// MatrixColumn is one slice of the heatmap at a single flush time. +type MatrixColumn struct { + At time.Time + Rows []MatrixRow +} + +// MatrixRow is a single route or virtual bucket's counter snapshot +// taken at flush time. +type MatrixRow struct { + RouteID uint64 + Start, End []byte + Aggregate bool + MemberRoutes []uint64 + + Reads uint64 + Writes uint64 + ReadBytes uint64 + WriteBytes uint64 +} + +// NewMemSampler constructs a sampler with the supplied options. Zero +// fields fall back to the Default* constants; non-positive values +// (including explicitly-negative HistoryColumns) are clamped to a safe +// minimum by newRingBuffer. Always returns a usable sampler — callers +// should pass a zero options struct for the default configuration. +func NewMemSampler(opts MemSamplerOptions) *MemSampler { + if opts.Step <= 0 { + opts.Step = DefaultStep + } + if opts.HistoryColumns <= 0 { + opts.HistoryColumns = DefaultHistoryColumns + } + if opts.MaxTrackedRoutes <= 0 { + opts.MaxTrackedRoutes = DefaultMaxTrackedRoutes + } + if opts.MaxMemberRoutesPerSlot <= 0 { + opts.MaxMemberRoutesPerSlot = DefaultMaxMemberRoutesPerSlot + } + now := opts.Now + if now == nil { + now = time.Now + } + s := &MemSampler{ + opts: opts, + now: now, + history: newRingBuffer(opts.HistoryColumns), + } + s.table.Store(newEmptyRouteTable()) + return s +} + +func newEmptyRouteTable() *routeTable { + return &routeTable{ + slots: map[uint64]*routeSlot{}, + virtualForRoute: map[uint64]*routeSlot{}, + } +} + +// Observe records one request against a route. Cost on a hit: +// atomic.Pointer.Load + plain map lookup + 2× atomic.AddUint64 (count +// and bytes). Misses (RouteID never registered) drop silently — the +// route-watch subscriber is responsible for Register before the +// coordinator publishes the new RouteID. +func (s *MemSampler) Observe(routeID uint64, op Op, keyLen, valueLen int) { + if s == nil { + return + } + tbl := s.table.Load() + slot, ok := tbl.slots[routeID] + if !ok { + slot, ok = tbl.virtualForRoute[routeID] + if !ok { + return + } + } + byteCount := uint64(0) + if keyLen > 0 { + byteCount += uint64(keyLen) + } + if valueLen > 0 { + byteCount += uint64(valueLen) + } + switch op { + case OpRead: + slot.reads.Add(1) + if byteCount > 0 { + slot.readBytes.Add(byteCount) + } + case OpWrite: + slot.writes.Add(1) + if byteCount > 0 { + slot.writeBytes.Add(byteCount) + } + } +} + +// RegisterRoute adds a (RouteID, [Start, End)) pair to the tracking +// set. Returns true when the route gets its own slot, false when the +// MaxTrackedRoutes cap was hit and the route was folded into a +// virtual aggregate bucket. Idempotent: calling twice with the same +// RouteID is a no-op (the original slot stays in place). +// +// If a previous RemoveRoute(routeID) queued a deferred member-prune +// and the route is now re-registered into the SAME bucket inside the +// grace window, that prune is cancelled — otherwise the routeID +// would disappear from bucket.MemberRoutes despite Observe still +// attributing fresh traffic to it. Prunes for different buckets (or +// when the route rejoins as an individual slot) are left alone so +// the old bucket's MemberRoutes is correctly cleaned up. +func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte) bool { + if s == nil { + return false + } + s.routesMu.Lock() + defer s.routesMu.Unlock() + + cur := s.table.Load() + if _, ok := cur.slots[routeID]; ok { + return true + } + if _, ok := cur.virtualForRoute[routeID]; ok { + return false + } + + next := copyRouteTable(cur) + if len(next.slots) < s.opts.MaxTrackedRoutes { + slot := s.reclaimRetiredSlot(routeID) + if slot == nil { + slot = &routeSlot{ + RouteID: routeID, + Start: cloneBytes(start), + End: cloneBytes(end), + } + } else { + // Re-registering the same routeID inside the grace window: + // reuse the retired slot so any in-flight Observe writers + // hitting the prior table snapshot land on the same slot + // the new traffic uses, and Flush emits a single row per + // RouteID instead of two (one live + one retired) with + // counts split across them. Refresh the metadata fields to + // match the new registration; counters are preserved. + slot.metaMu.Lock() + slot.Start = cloneBytes(start) + slot.End = cloneBytes(end) + slot.metaMu.Unlock() + } + next.slots[routeID] = slot + next.sortedSlots = appendSorted(next.sortedSlots, slot) + s.table.Store(next) + return true + } + + // Coarsening: prefer the first virtual bucket whose [Start, End) + // covers `start`; if none does, fall back to the first aggregate + // in sortedSlots order (lowest Start). A new bucket is created + // only when no virtual bucket exists yet. See findVirtualBucket + // for the exact selection. + bucket := findVirtualBucket(next.sortedSlots, start) + if bucket == nil { + bucket = &routeSlot{ + RouteID: s.nextVirtualBucketID(), + Start: cloneBytes(start), + End: cloneBytes(end), + Aggregate: true, + MemberRoutes: []uint64{routeID}, + } + next.sortedSlots = appendSorted(next.sortedSlots, bucket) + } else { + s.foldIntoBucket(next, bucket, routeID, start, end) + } + // The route is rejoining `bucket`; cancel any deferred prune for + // this same routeID against this same bucket so it stays in + // MemberRoutes. Prunes against other buckets are intentionally + // left in place. + s.cancelPendingPruneFor(bucket, routeID) + next.virtualForRoute[routeID] = bucket + s.table.Store(next) + return false +} + +// foldIntoBucket extends an existing virtual bucket to cover routeID's +// [start, end). Mutates bucket under its metaMu so a concurrent Flush +// iterating the previous table's snapshot doesn't observe a +// half-extended MemberRoutes slice or partially-updated Start/End. +// Counters live next to the metadata but are protected by their own +// atomic ops, not metaMu. If Start is lowered, sortedSlots is rebuilt +// to preserve Flush's key-order contract. +// +// MemberRoutes growth is capped by MaxMemberRoutesPerSlot — beyond +// that cap the bucket counters still absorb the route's traffic, but +// the routeID is not added to the visible member list. +func (s *MemSampler) foldIntoBucket(next *routeTable, bucket *routeSlot, routeID uint64, start, end []byte) { + bucket.metaMu.Lock() + if !memberRoutesContains(bucket.MemberRoutes, routeID) && + len(bucket.MemberRoutes) < s.opts.MaxMemberRoutesPerSlot { + bucket.MemberRoutes = append(bucket.MemberRoutes, routeID) + } + if len(end) == 0 || (len(bucket.End) != 0 && bytesGT(end, bucket.End)) { + bucket.End = cloneBytes(end) + } + startLowered := bytesLT(start, bucket.Start) + if startLowered { + bucket.Start = cloneBytes(start) + } + bucket.metaMu.Unlock() + if startLowered { + next.sortedSlots = rebuildSorted(next) + } +} + +// RemoveRoute drops a RouteID from tracking. Counts accumulated since +// the last flush are NOT lost: the retired slot (or, for virtual-bucket +// members, just the membership entry) is queued for one final drain by +// the next Flush. Subsequent Observe(routeID, …) calls are silent +// no-ops. Idempotent. +func (s *MemSampler) RemoveRoute(routeID uint64) { + if s == nil { + return + } + s.routesMu.Lock() + defer s.routesMu.Unlock() + cur := s.table.Load() + individual, isIndividual := cur.slots[routeID] + bucket, isVirtual := cur.virtualForRoute[routeID] + if !isIndividual && !isVirtual { + return + } + + next := copyRouteTable(cur) + delete(next.slots, routeID) + delete(next.virtualForRoute, routeID) + + retiredAt := s.now() + switch { + case isIndividual: + // Pending counters in this slot must be harvested by upcoming + // Flush cycles. Drain across the grace window so an Observe + // call that loaded the prior table just before this + // atomic.Store can still complete its Add into the slot — that + // in-flight write is caught by a later drain rather than + // silently lost. + s.retiredMu.Lock() + s.retiredSlots = append(s.retiredSlots, retiredSlot{slot: individual, retiredAt: retiredAt}) + s.retiredMu.Unlock() + case isVirtual: + // Defer pruning until after the bucket's pre-removal counters + // have been drained. While the prune is pending the routeID + // stays in MemberRoutes so the next few Flush rows attribute + // the bucket's mixed counters to all members that contributed + // to them — including the route we are removing. + s.retiredMu.Lock() + s.pendingPrunes = append(s.pendingPrunes, memberPrune{ + bucket: bucket, routeID: routeID, retiredAt: retiredAt, + }) + // If this delete left the bucket with no remaining + // virtualForRoute mapping, rebuildSorted will drop it from the + // live sortedSlots. Queue it as a retired slot so Flush keeps + // draining its counters across the grace window — otherwise + // pre-removal increments and any in-flight Observe writers + // hitting the prior table snapshot would be silently lost. + if !bucketStillReferenced(next.virtualForRoute, bucket) { + s.retiredSlots = append(s.retiredSlots, retiredSlot{ + slot: bucket, retiredAt: retiredAt, + }) + } + s.retiredMu.Unlock() + } + + next.sortedSlots = rebuildSorted(next) + s.table.Store(next) +} + +// Flush drains every slot's counters with atomic.SwapUint64 and +// appends one MatrixColumn to the ring buffer. Idle slots (all +// counters zero) are skipped to keep the column compact. Slots that +// RemoveRoute retired since the previous Flush are drained alongside +// the live table, so route churn does not silently lose counts. +// +// Rows are emitted in Start-key order regardless of which slot list +// they came from (live, retired, or virtual-member-pruned), preserving +// the API contract that matrix consumers can rely on monotone Start +// across columns. +func (s *MemSampler) Flush() { + if s == nil { + return + } + col := MatrixColumn{At: s.now()} + tbl := s.table.Load() + for _, slot := range tbl.sortedSlots { + col.Rows = appendDrainedRow(col.Rows, slot) + } + + grace := s.graceWindow() + s.retiredMu.Lock() + s.retiredSlots = drainRetiredSlots(s.retiredSlots, &col.Rows, col.At, grace) + s.pendingPrunes = advancePendingPrunes(s.pendingPrunes, col.At, grace) + s.retiredMu.Unlock() + + sort.SliceStable(col.Rows, func(i, j int) bool { + return bytesLT(col.Rows[i].Start, col.Rows[j].Start) + }) + + s.historyMu.Lock() + s.history.Push(col) + s.historyMu.Unlock() +} + +// drainRetiredSlots emits a row for each retired slot and returns the +// entries whose grace window has not yet elapsed. Rows are appended +// to *rows so the caller sees the slice growth. Entries whose +// elapsed time (now - retiredAt) has reached grace are dropped after +// this final drain. The dropped tail of the backing array is zeroed +// so released *routeSlot pointers do not stay GC-reachable through +// the reused capacity. +func drainRetiredSlots(retired []retiredSlot, rows *[]MatrixRow, now time.Time, grace time.Duration) []retiredSlot { + keep := retired[:0] + for _, r := range retired { + *rows = appendDrainedRow(*rows, r.slot) + if now.Sub(r.retiredAt) < grace { + keep = append(keep, r) + } + } + clearTail(retired, len(keep)) + return keep +} + +// advancePendingPrunes lets each pending member-prune live until its +// retiredAt+grace passes, then actually prunes the routeID from the +// bucket's MemberRoutes. Returns the entries still inside the grace +// window. Like drainRetiredSlots, the dropped tail is zeroed so +// released bucket pointers don't linger via the reused capacity. +func advancePendingPrunes(pending []memberPrune, now time.Time, grace time.Duration) []memberPrune { + keep := pending[:0] + for _, p := range pending { + if now.Sub(p.retiredAt) < grace { + keep = append(keep, p) + continue + } + pruneMemberRoute(p.bucket, p.routeID) + } + clearPruneTail(pending, len(keep)) + return keep +} + +// cancelPendingPruneFor drops any pendingPrune entry whose routeID +// AND bucket pointer both match — i.e. the same routeID is rejoining +// the same bucket it was just removed from. This stops a deferred +// prune from removing the routeID from MemberRoutes despite Observe +// still attributing traffic to it. Prunes against other buckets (or +// where the route rejoins as an individual slot) are left in place +// so the old bucket's MemberRoutes is correctly cleaned up. +// +// Holds retiredMu briefly off the hot path; callers must already +// hold routesMu so the cancellation pairs atomically with the +// route-table mutation. +func (s *MemSampler) cancelPendingPruneFor(bucket *routeSlot, routeID uint64) { + s.retiredMu.Lock() + defer s.retiredMu.Unlock() + keep := s.pendingPrunes[:0] + for _, p := range s.pendingPrunes { + if p.routeID == routeID && p.bucket == bucket { + continue + } + keep = append(keep, p) + } + clearPruneTail(s.pendingPrunes, len(keep)) + s.pendingPrunes = keep +} + +// nextVirtualBucketID returns a synthetic RouteID for a brand-new +// virtual bucket. Synthetic IDs come from the high end of uint64 and +// decrement, so they cannot collide with real route IDs (which are +// assigned from the low end by the coordinator). Without this, +// stamping the bucket with the first folded real RouteID would mean +// that ID could later show up on TWO rows in the same column — one +// aggregate, one individual — if the original route is later +// re-registered as an individual slot. +func (s *MemSampler) nextVirtualBucketID() uint64 { + // atomic.Uint64 starts at 0; adding ^uint64(0) wraps to MaxUint64 + // on the first call, MaxUint64-1 on the second, etc. + return s.virtualIDCounter.Add(^uint64(0)) +} + +// reclaimRetiredSlot looks for a non-aggregate retired slot whose +// RouteID matches the supplied routeID and, if found, removes it +// from retiredSlots and returns it for reuse. This guarantees a +// route removed and re-registered inside the grace window is +// represented by a single *routeSlot — Flush would otherwise emit +// two rows with the same RouteID (one from the new live slot, one +// from the still-draining retired slot) and split counts across +// them. Aggregate (orphaned virtual bucket) entries are left in +// place because their RouteID lives in the synthetic namespace and +// a real-ID match against an aggregate would be coincidental. +func (s *MemSampler) reclaimRetiredSlot(routeID uint64) *routeSlot { + s.retiredMu.Lock() + defer s.retiredMu.Unlock() + var reclaimed *routeSlot + keep := s.retiredSlots[:0] + for _, r := range s.retiredSlots { + if reclaimed == nil && !r.slot.Aggregate && r.slot.RouteID == routeID { + reclaimed = r.slot + continue + } + keep = append(keep, r) + } + clearTail(s.retiredSlots, len(keep)) + s.retiredSlots = keep + return reclaimed +} + +// memberRoutesContains reports whether routeID is already listed in +// members. Used as a dedup guard so re-registering a routeID inside +// the prune grace window doesn't add a duplicate MemberRoutes entry. +func memberRoutesContains(members []uint64, routeID uint64) bool { + for _, m := range members { + if m == routeID { + return true + } + } + return false +} + +// clearTail zeroes the [keepLen, len(s)) range of s so dropped entries +// don't keep their *routeSlot pointers GC-reachable through the +// reused backing array. +func clearTail(s []retiredSlot, keepLen int) { + for i := keepLen; i < len(s); i++ { + s[i] = retiredSlot{} + } +} + +// clearPruneTail is clearTail for the pendingPrunes queue. +func clearPruneTail(s []memberPrune, keepLen int) { + for i := keepLen; i < len(s); i++ { + s[i] = memberPrune{} + } +} + +// bucketStillReferenced reports whether any RouteID in +// virtualForRoute still maps to bucket. Used by RemoveRoute to detect +// when a virtual bucket has lost its last member and must be retired +// for grace draining. +func bucketStillReferenced(virtualForRoute map[uint64]*routeSlot, bucket *routeSlot) bool { + for _, b := range virtualForRoute { + if b == bucket { + return true + } + } + return false +} + +// pruneMemberRoute removes routeID from bucket.MemberRoutes under the +// bucket's metaMu so a concurrent snapshotMeta reader sees a +// consistent view. +func pruneMemberRoute(bucket *routeSlot, routeID uint64) { + bucket.metaMu.Lock() + defer bucket.metaMu.Unlock() + filtered := bucket.MemberRoutes[:0] + for _, m := range bucket.MemberRoutes { + if m != routeID { + filtered = append(filtered, m) + } + } + bucket.MemberRoutes = filtered +} + +// Step returns the configured flush interval after applying default +// fallbacks. Callers wiring up RunFlusher can use this to align their +// ticker with the sampler's expectations rather than passing the +// interval through two configuration paths. +func (s *MemSampler) Step() time.Duration { + if s == nil { + return DefaultStep + } + return s.opts.Step +} + +// appendDrainedRow swaps the slot's counters to zero and appends a +// MatrixRow when any counter was non-zero. Idle slots are skipped. +// Metadata is read under the slot's metaMu so a concurrent +// RegisterRoute fold cannot race with the row materialisation. +func appendDrainedRow(rows []MatrixRow, slot *routeSlot) []MatrixRow { + reads := slot.reads.Swap(0) + writes := slot.writes.Swap(0) + readBytes := slot.readBytes.Swap(0) + writeBytes := slot.writeBytes.Swap(0) + if reads == 0 && writes == 0 && readBytes == 0 && writeBytes == 0 { + return rows + } + start, end, aggregate, members := slot.snapshotMeta() + return append(rows, MatrixRow{ + RouteID: slot.RouteID, + Start: start, + End: end, + Aggregate: aggregate, + MemberRoutes: members, + Reads: reads, + Writes: writes, + ReadBytes: readBytes, + WriteBytes: writeBytes, + }) +} + +// Snapshot returns the matrix columns in the supplied [from, to) +// half-open time range. Ordering is oldest-first. Series selects which +// MatrixRow value the caller is going to display; the slot metadata +// (RouteID, Start, End, Aggregate, MemberRoutes) is included on every +// row so a UI can render the same response with different series +// without a re-query. +func (s *MemSampler) Snapshot(from, to time.Time) []MatrixColumn { + if s == nil { + return nil + } + s.historyMu.Lock() + defer s.historyMu.Unlock() + return s.history.Range(from, to) +} + +// helpers below ----------------------------------------------------- + +func copyRouteTable(src *routeTable) *routeTable { + dst := &routeTable{ + slots: make(map[uint64]*routeSlot, len(src.slots)+1), + virtualForRoute: make(map[uint64]*routeSlot, len(src.virtualForRoute)+1), + sortedSlots: append([]*routeSlot(nil), src.sortedSlots...), + } + for k, v := range src.slots { + dst.slots[k] = v + } + for k, v := range src.virtualForRoute { + dst.virtualForRoute[k] = v + } + return dst +} + +func appendSorted(slots []*routeSlot, slot *routeSlot) []*routeSlot { + idx := sort.Search(len(slots), func(i int) bool { + return bytesGE(slots[i].Start, slot.Start) + }) + slots = append(slots, nil) + copy(slots[idx+1:], slots[idx:]) + slots[idx] = slot + return slots +} + +func rebuildSorted(tbl *routeTable) []*routeSlot { + out := make([]*routeSlot, 0, len(tbl.slots)+len(tbl.virtualForRoute)) + seen := make(map[*routeSlot]struct{}, cap(out)) + for _, s := range tbl.slots { + if _, dup := seen[s]; dup { + continue + } + seen[s] = struct{}{} + out = append(out, s) + } + for _, s := range tbl.virtualForRoute { + if _, dup := seen[s]; dup { + continue + } + seen[s] = struct{}{} + out = append(out, s) + } + sort.Slice(out, func(i, j int) bool { + return bytesLT(out[i].Start, out[j].Start) + }) + return out +} + +// findVirtualBucket returns the existing virtual bucket that covers +// start, preferring an exact range match. If no bucket contains +// start, returns the first aggregate in sortedSlots order so +// over-budget routes collapse into a single global bucket rather than +// fragmenting across many. Returns nil only when no virtual bucket +// exists yet — caller creates one in that case. +func findVirtualBucket(sorted []*routeSlot, start []byte) *routeSlot { + for _, s := range sorted { + if !s.Aggregate { + continue + } + if bytesLE(s.Start, start) && (len(s.End) == 0 || bytesLT(start, s.End)) { + return s + } + } + for _, s := range sorted { + if s.Aggregate { + return s + } + } + return nil +} + +func cloneBytes(b []byte) []byte { + if len(b) == 0 { + return nil + } + out := make([]byte, len(b)) + copy(out, b) + return out +} + +func bytesLT(a, b []byte) bool { return bytes.Compare(a, b) < 0 } +func bytesLE(a, b []byte) bool { return bytes.Compare(a, b) <= 0 } +func bytesGT(a, b []byte) bool { return bytes.Compare(a, b) > 0 } +func bytesGE(a, b []byte) bool { return bytes.Compare(a, b) >= 0 } diff --git a/keyviz/sampler_test.go b/keyviz/sampler_test.go new file mode 100644 index 000000000..b41fbc548 --- /dev/null +++ b/keyviz/sampler_test.go @@ -0,0 +1,982 @@ +package keyviz + +import ( + "sync" + "sync/atomic" + "testing" + "time" +) + +func newTestSampler(t *testing.T, opts MemSamplerOptions) (*MemSampler, *fakeClock) { + t.Helper() + clk := &fakeClock{now: time.Unix(1_700_000_000, 0)} + if opts.Now == nil { + opts.Now = clk.Now + } + return NewMemSampler(opts), clk +} + +type fakeClock struct { + mu sync.Mutex + now time.Time +} + +func (c *fakeClock) Now() time.Time { + c.mu.Lock() + defer c.mu.Unlock() + return c.now +} + +func (c *fakeClock) Advance(d time.Duration) { + c.mu.Lock() + defer c.mu.Unlock() + c.now = c.now.Add(d) +} + +func TestNilSamplerObserveSafe(t *testing.T) { + t.Parallel() + var s *MemSampler // nil + s.Observe(1, OpRead, 10, 20) + if s.RegisterRoute(1, []byte("a"), []byte("b")) { + t.Fatal("nil RegisterRoute should return false") + } + s.RemoveRoute(1) + s.Flush() + if got := s.Snapshot(time.Time{}, time.Time{}); got != nil { + t.Fatalf("nil Snapshot should be nil, got %v", got) + } +} + +func TestObserveAndFlushBasic(t *testing.T) { + t.Parallel() + s, clk := newTestSampler(t, MemSamplerOptions{Step: time.Second, HistoryColumns: 4}) + if !s.RegisterRoute(1, []byte("a"), []byte("c")) { + t.Fatal("RegisterRoute(1) returned false") + } + s.Observe(1, OpRead, 5, 10) + s.Observe(1, OpRead, 5, 10) + s.Observe(1, OpWrite, 5, 100) + + s.Flush() + clk.Advance(time.Second) + s.Flush() // empty + + cols := s.Snapshot(time.Time{}, time.Time{}) + if len(cols) != 2 { + t.Fatalf("got %d columns, want 2", len(cols)) + } + c0 := cols[0] + if len(c0.Rows) != 1 { + t.Fatalf("col0 rows = %d, want 1", len(c0.Rows)) + } + r := c0.Rows[0] + if r.RouteID != 1 || r.Reads != 2 || r.Writes != 1 || r.ReadBytes != 30 || r.WriteBytes != 105 { + t.Fatalf("col0 row = %+v", r) + } + if len(cols[1].Rows) != 0 { + t.Fatalf("col1 should be empty (no observe between flushes), got %v", cols[1].Rows) + } +} + +// TestNoCountsLostAcrossFlush asserts the SwapUint64 flush protocol +// doesn't drop counts even when many goroutines hammer Observe across +// the flush boundary. Repeats the exercise N times to flush out +// scheduling races. +func TestNoCountsLostAcrossFlush(t *testing.T) { + t.Parallel() + const ( + writers = 8 + perWriter = 5_000 + flushes = 20 + ) + s, _ := newTestSampler(t, MemSamplerOptions{Step: time.Millisecond, HistoryColumns: 1024}) + if !s.RegisterRoute(1, []byte("a"), []byte("b")) { + t.Fatal("Register failed") + } + + stop := make(chan struct{}) + var wg sync.WaitGroup + for w := 0; w < writers; w++ { + wg.Add(1) + go func() { + defer wg.Done() + for n := 0; n < perWriter; n++ { + select { + case <-stop: + return + default: + } + s.Observe(1, OpRead, 1, 0) + } + }() + } + // Fire flushes from a separate goroutine while writers run. + flushDone := make(chan struct{}) + go func() { + defer close(flushDone) + for i := 0; i < flushes; i++ { + s.Flush() + time.Sleep(time.Microsecond * 100) + } + }() + + wg.Wait() + close(stop) + <-flushDone + // One last flush so any straggler atomic.Add is harvested. + s.Flush() + + var total uint64 + for _, col := range s.Snapshot(time.Time{}, time.Time{}) { + for _, row := range col.Rows { + total += row.Reads + } + } + want := uint64(writers * perWriter) + if total != want { + t.Fatalf("total reads = %d, want %d (lost %d)", total, want, want-total) + } +} + +func TestRouteBudgetCoarsensIntoVirtualBucket(t *testing.T) { + t.Parallel() + s := budgetSetup(t) + s.Observe(1, OpRead, 1, 0) + s.Observe(2, OpRead, 1, 0) + s.Observe(3, OpRead, 1, 0) + s.Flush() + rows := budgetSingleColumnRows(t, s) + agg := findAggregateRow(t, rows) + if len(agg.MemberRoutes) != 1 || agg.MemberRoutes[0] != 3 { + t.Fatalf("aggregate.MemberRoutes = %v, want [3]", agg.MemberRoutes) + } +} + +func budgetSetup(t *testing.T) *MemSampler { + t.Helper() + s, _ := newTestSampler(t, MemSamplerOptions{ + Step: time.Second, + HistoryColumns: 4, + MaxTrackedRoutes: 2, + }) + if !s.RegisterRoute(1, []byte("a"), []byte("c")) { + t.Fatal("route 1 should fit") + } + if !s.RegisterRoute(2, []byte("c"), []byte("e")) { + t.Fatal("route 2 should fit") + } + if s.RegisterRoute(3, []byte("e"), []byte("g")) { + t.Fatal("route 3 over budget should return false (folded into virtual bucket)") + } + return s +} + +func budgetSingleColumnRows(t *testing.T, s *MemSampler) []MatrixRow { + t.Helper() + cols := s.Snapshot(time.Time{}, time.Time{}) + if len(cols) != 1 { + t.Fatalf("len(cols) = %d", len(cols)) + } + rows := cols[0].Rows + if len(rows) != 3 { + t.Fatalf("rows = %d, want 3", len(rows)) + } + return rows +} + +func findAggregateRow(t *testing.T, rows []MatrixRow) MatrixRow { + t.Helper() + for i := range rows { + if rows[i].Aggregate { + return rows[i] + } + } + t.Fatalf("no aggregate row in output: %+v", rows) + return MatrixRow{} +} + +func TestSnapshotRangeFilters(t *testing.T) { + t.Parallel() + s, clk := newTestSampler(t, MemSamplerOptions{Step: time.Second, HistoryColumns: 8}) + s.RegisterRoute(1, []byte("a"), []byte("b")) + for i := 0; i < 5; i++ { + s.Observe(1, OpRead, 0, 0) + s.Flush() + clk.Advance(time.Second) + } + all := s.Snapshot(time.Time{}, time.Time{}) + if len(all) != 5 { + t.Fatalf("all len = %d", len(all)) + } + mid := all[1].At + end := all[3].At + got := s.Snapshot(mid, end) + if len(got) != 2 { + t.Fatalf("[mid,end) len = %d, want 2 (entries at +1s and +2s); cols=%v", len(got), columnTimes(all)) + } + if !got[0].At.Equal(mid) { + t.Fatalf("got[0].At = %v, want %v", got[0].At, mid) + } +} + +func TestRingBufferDropsOldest(t *testing.T) { + t.Parallel() + s, clk := newTestSampler(t, MemSamplerOptions{Step: time.Second, HistoryColumns: 3}) + s.RegisterRoute(1, []byte("a"), []byte("b")) + for i := 0; i < 5; i++ { + s.Observe(1, OpRead, 0, 0) + s.Flush() + clk.Advance(time.Second) + } + all := s.Snapshot(time.Time{}, time.Time{}) + if len(all) != 3 { + t.Fatalf("len = %d, want 3 (cap)", len(all)) + } + // Should be the most recent three columns, oldest first. + for i := 1; i < len(all); i++ { + if !all[i].At.After(all[i-1].At) { + t.Fatalf("not chronological at %d: %v then %v", i, all[i-1].At, all[i].At) + } + } +} + +func TestRemoveRouteSilencesObserve(t *testing.T) { + t.Parallel() + s, _ := newTestSampler(t, MemSamplerOptions{Step: time.Second, HistoryColumns: 4}) + s.RegisterRoute(1, []byte("a"), []byte("b")) + s.Observe(1, OpRead, 0, 0) + s.RemoveRoute(1) + s.Observe(1, OpRead, 0, 0) // silently dropped + s.Flush() + cols := s.Snapshot(time.Time{}, time.Time{}) + for _, c := range cols { + for _, r := range c.Rows { + if r.RouteID == 1 && r.Reads > 1 { + t.Fatalf("post-remove Observe leaked: %+v", r) + } + } + } +} + +// TestRemoveRouteHarvestsPendingCounts pins Codex P2: counts +// accumulated between the last flush and RemoveRoute must not be +// silently dropped. The retired slot is queued for one final drain. +func TestRemoveRouteHarvestsPendingCounts(t *testing.T) { + t.Parallel() + s, _ := newTestSampler(t, MemSamplerOptions{Step: time.Second, HistoryColumns: 4}) + s.RegisterRoute(1, []byte("a"), []byte("b")) + for i := 0; i < 7; i++ { + s.Observe(1, OpRead, 1, 0) + } + // Remove BEFORE flushing — pending 7 reads must surface. + s.RemoveRoute(1) + s.Flush() + + cols := s.Snapshot(time.Time{}, time.Time{}) + if len(cols) != 1 { + t.Fatalf("len(cols) = %d, want 1", len(cols)) + } + rows := cols[0].Rows + if len(rows) != 1 || rows[0].RouteID != 1 || rows[0].Reads != 7 { + t.Fatalf("rows = %+v, want one row with route=1 reads=7", rows) + } +} + +// TestRemoveVirtualMemberPrunesMemberRoutes pins Codex P2: when a +// coarsened (virtual-bucket) RouteID is removed, the bucket's +// MemberRoutes list must eventually drop that ID. Pruning is deferred +// across the wall-clock grace window so the row attribution stays +// correct while the bucket counters still include the removed route's +// pre-removal increments — TestRemoveVirtualMemberPruneDeferred pins +// the grace-window half of this contract. +func TestRemoveVirtualMemberPrunesMemberRoutes(t *testing.T) { + t.Parallel() + s, clk := setupOneIndividualPlusVirtualBucket(t) + s.RemoveRoute(2) + // Drain at least once within grace, then advance past grace so the + // next Flush actually executes the prune. One more Flush captures + // the post-prune MemberRoutes state in a row. + s.Observe(3, OpRead, 1, 0) + s.Flush() + clk.Advance(s.graceWindow() + time.Second) + s.Observe(3, OpRead, 1, 0) + s.Flush() + s.Observe(3, OpRead, 1, 0) + s.Flush() + + agg := findAggregateRow(t, lastSnapshotColumn(t, s).Rows) + if memberRoutesContain(agg.MemberRoutes, 2) { + t.Fatalf("after grace, removed route 2 still in MemberRoutes: %v", agg.MemberRoutes) + } + if len(agg.MemberRoutes) != 1 || agg.MemberRoutes[0] != 3 { + t.Fatalf("MemberRoutes = %v, want [3]", agg.MemberRoutes) + } +} + +// TestRemoveVirtualMemberPruneDeferred pins the deferred-prune +// contract: the removed routeID stays in MemberRoutes throughout the +// wall-clock grace window so flushed rows whose counters still include +// that route's pre-removal increments attribute the traffic correctly. +func TestRemoveVirtualMemberPruneDeferred(t *testing.T) { + t.Parallel() + s, _ := setupOneIndividualPlusVirtualBucket(t) + s.RemoveRoute(2) + // Two flushes inside the grace window — the clock is not advanced, + // so each Flush sees now-retiredAt == 0 < grace and keeps the + // prune entry. + for i := 0; i < 2; i++ { + s.Observe(3, OpRead, 1, 0) + s.Flush() + col := lastSnapshotColumn(t, s) + agg := findAggregateRow(t, col.Rows) + if !memberRoutesContain(agg.MemberRoutes, 2) { + t.Fatalf("flush %d within grace dropped route 2 too early: %v", i, agg.MemberRoutes) + } + } +} + +func lastSnapshotColumn(t *testing.T, s *MemSampler) MatrixColumn { + t.Helper() + cols := s.Snapshot(time.Time{}, time.Time{}) + if len(cols) == 0 { + t.Fatal("snapshot empty") + } + return cols[len(cols)-1] +} + +func memberRoutesContain(members []uint64, target uint64) bool { + for _, m := range members { + if m == target { + return true + } + } + return false +} + +func setupOneIndividualPlusVirtualBucket(t *testing.T) (*MemSampler, *fakeClock) { + t.Helper() + s, clk := newTestSampler(t, MemSamplerOptions{ + Step: time.Second, + HistoryColumns: 4, + MaxTrackedRoutes: 1, + }) + if !s.RegisterRoute(1, []byte("a"), []byte("c")) { + t.Fatal("route 1 should fit") + } + if s.RegisterRoute(2, []byte("c"), []byte("e")) { + t.Fatal("route 2 should fold (over budget)") + } + if s.RegisterRoute(3, []byte("e"), []byte("g")) { + t.Fatal("route 3 should fold (over budget)") + } + return s, clk +} + +// TestRegisterDoesNotRaceFlushOnVirtualBucket pins Codex P1: folding +// a new route into an existing virtual bucket must not race a +// concurrent Flush iterating the slot's metadata. -race detector +// makes the regression observable. +func TestRegisterDoesNotRaceFlushOnVirtualBucket(t *testing.T) { + t.Parallel() + s, _ := newTestSampler(t, MemSamplerOptions{ + Step: time.Millisecond, + HistoryColumns: 1024, + MaxTrackedRoutes: 1, + }) + if !s.RegisterRoute(1, []byte("a"), []byte("b")) { + t.Fatal("route 1 should fit") + } + // Seed a virtual bucket so subsequent Registers fold into it. + if s.RegisterRoute(2, []byte("c"), []byte("d")) { + t.Fatal("route 2 should fold") + } + + stop := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for n := 3; ; n++ { + select { + case <-stop: + return + default: + } + s.RegisterRoute(uint64(n), []byte{byte(n)}, []byte{byte(n + 1)}) //nolint:gosec // bounded test loop. + } + }() + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-stop: + return + default: + } + s.Flush() + } + }() + + time.Sleep(50 * time.Millisecond) + close(stop) + wg.Wait() + // No assertion needed — race detector failures are the test signal. +} + +func TestObserveOnUnknownRouteIsNoop(t *testing.T) { + t.Parallel() + s, _ := newTestSampler(t, MemSamplerOptions{Step: time.Second, HistoryColumns: 4}) + s.Observe(42, OpRead, 0, 0) + s.Flush() + cols := s.Snapshot(time.Time{}, time.Time{}) + for _, c := range cols { + if len(c.Rows) != 0 { + t.Fatalf("unknown-route Observe leaked: %+v", c) + } + } +} + +// TestConcurrentRegisterAndObserveRace is the -race regression: a +// concurrent RegisterRoute (COW table publish) must not race with +// Observe (Load + map lookup against the snapshot). +func TestConcurrentRegisterAndObserveRace(t *testing.T) { + t.Parallel() + s, _ := newTestSampler(t, MemSamplerOptions{Step: time.Second, HistoryColumns: 8}) + const routes = 64 + var observed atomic.Uint64 + + stop := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for i := uint64(1); i <= routes; i++ { + s.RegisterRoute(i, []byte{byte('a' + i%26)}, []byte{byte('a' + (i+1)%26)}) + time.Sleep(time.Microsecond) + } + }() + wg.Add(4) + for w := 0; w < 4; w++ { + go func() { + defer wg.Done() + for { + select { + case <-stop: + return + default: + } + for i := uint64(1); i <= routes; i++ { + s.Observe(i, OpRead, 0, 0) + observed.Add(1) + } + } + }() + } + + time.Sleep(20 * time.Millisecond) + close(stop) + wg.Wait() + + if observed.Load() == 0 { + t.Fatal("observers ran zero iterations — schedule starvation?") + } +} + +func columnTimes(cols []MatrixColumn) []time.Time { + out := make([]time.Time, len(cols)) + for i, c := range cols { + out[i] = c.At + } + return out +} + +// TestRemoveLastVirtualMemberHarvestsBucket pins Codex round-5 P1: +// removing the last member of a virtual bucket leaves the bucket +// orphaned in the route table — rebuildSorted no longer reaches it +// from virtualForRoute, so without the orphan-retire path Flush would +// silently lose any pre-removal counters the bucket still holds. +func TestRemoveLastVirtualMemberHarvestsBucket(t *testing.T) { + t.Parallel() + s, _ := newTestSampler(t, MemSamplerOptions{ + Step: time.Second, + HistoryColumns: 4, + MaxTrackedRoutes: 1, + }) + mustRegister(t, s, 1, "a", "b") + if s.RegisterRoute(2, []byte("c"), []byte("d")) { + t.Fatal("route 2 over budget should fold into virtual bucket") + } + s.Observe(2, OpRead, 5, 7) + s.RemoveRoute(2) + s.Flush() + + rows := lastSnapshotColumn(t, s).Rows + agg := findAggregateRow(t, rows) + if agg.Reads != 1 || agg.ReadBytes != 12 { + t.Fatalf("orphan bucket counts dropped: reads=%d bytes=%d (want 1, 12)", + agg.Reads, agg.ReadBytes) + } +} + +// TestFlushSortsMixedLiveAndRetiredRows pins the row-ordering +// invariant when retired-slot drains land in the same column as live +// slot drains. Without a final sort the retired rows would be appended +// in queue order, producing a column whose Rows are not monotone by +// Start. +func TestFlushSortsMixedLiveAndRetiredRows(t *testing.T) { + t.Parallel() + s, _ := newTestSampler(t, MemSamplerOptions{Step: time.Second, HistoryColumns: 4}) + mustRegister(t, s, 1, "a", "b") + mustRegister(t, s, 2, "m", "n") + mustRegister(t, s, 3, "z", "{") + s.Observe(2, OpRead, 0, 0) + s.RemoveRoute(2) + s.Observe(1, OpRead, 0, 0) + s.Observe(3, OpRead, 0, 0) + s.Flush() + flushedRowsSorted(t, s) +} + +// TestRetiredSlotGracePeriod asserts that a retired slot stays in the +// drain queue for the wall-clock grace window so an Observe goroutine +// that loaded the pre-RemoveRoute table snapshot can complete its +// atomic.Add into the slot and still have the increment harvested by +// a subsequent Flush. We simulate the late writer by reaching into +// the retired-slot queue directly and bumping the counter between +// flushes inside the grace window. +func TestRetiredSlotGracePeriod(t *testing.T) { + t.Parallel() + s, clk := newTestSampler(t, MemSamplerOptions{Step: time.Second, HistoryColumns: 8}) + if !s.RegisterRoute(1, []byte("a"), []byte("b")) { + t.Fatal("RegisterRoute(1) returned false") + } + s.Observe(1, OpRead, 1, 2) + s.RemoveRoute(1) + + lateSlot := graceQueueSingleSlot(t, s) + s.Flush() // drain inside grace + lateSlot.reads.Add(7) + s.Flush() // drain inside grace, harvests the late-writer increment + + total := totalReadsForRoute(s.Snapshot(time.Time{}, time.Time{}), 1) + if total != 1+7 { + t.Fatalf("expected reads 8 (pre-remove + late-writer), got %d", total) + } + + clk.Advance(s.graceWindow() + time.Second) + s.Flush() // last drain, retiredAt+grace now passed → entry dropped + s.retiredMu.Lock() + leftover := len(s.retiredSlots) + s.retiredMu.Unlock() + if leftover != 0 { + t.Fatalf("retired slot not released after grace, len=%d", leftover) + } +} + +func graceQueueSingleSlot(t *testing.T, s *MemSampler) *routeSlot { + t.Helper() + s.retiredMu.Lock() + defer s.retiredMu.Unlock() + if len(s.retiredSlots) != 1 { + t.Fatalf("expected 1 retired slot, got %d", len(s.retiredSlots)) + } + return s.retiredSlots[0].slot +} + +func totalReadsForRoute(cols []MatrixColumn, routeID uint64) uint64 { + var total uint64 + for _, c := range cols { + for _, r := range c.Rows { + if r.RouteID == routeID { + total += r.Reads + } + } + } + return total +} + +// TestRegisterFoldLowerStartReorders guards the sortedSlots ordering +// invariant: when an over-budget route folds into an existing virtual +// bucket and lowers the bucket's Start, the bucket must be repositioned +// in sortedSlots so Flush emits matrix rows in key order. +func TestRegisterFoldLowerStartReorders(t *testing.T) { + t.Parallel() + s, _ := newTestSampler(t, MemSamplerOptions{ + Step: time.Second, + HistoryColumns: 4, + MaxTrackedRoutes: 2, + }) + mustRegister(t, s, 1, "m", "n") + mustRegister(t, s, 2, "p", "q") + // Route 3 is over budget — creates a virtual bucket at Start=r. + if s.RegisterRoute(3, []byte("r"), []byte("s")) { + t.Fatal("route 3 over budget should fold into virtual bucket") + } + // Route 4 is over budget AND has a Start ("a") below the bucket's + // existing Start ("r"). The fold must lower bucket.Start to "a" + // AND reposition the bucket within sortedSlots. + if s.RegisterRoute(4, []byte("a"), []byte("b")) { + t.Fatal("route 4 over budget should fold into virtual bucket") + } + s.Observe(1, OpRead, 0, 0) + s.Observe(2, OpRead, 0, 0) + s.Observe(3, OpRead, 0, 0) + s.Observe(4, OpRead, 0, 0) + s.Flush() + + rows := flushedRowsSorted(t, s) + agg := findAggregateRow(t, rows) + if string(agg.Start) != "a" { + t.Fatalf("aggregate.Start = %q, want %q (fold did not lower Start)", agg.Start, "a") + } +} + +func mustRegister(t *testing.T, s *MemSampler, routeID uint64, start, end string) { + t.Helper() + if !s.RegisterRoute(routeID, []byte(start), []byte(end)) { + t.Fatalf("RegisterRoute(%d) returned false", routeID) + } +} + +func flushedRowsSorted(t *testing.T, s *MemSampler) []MatrixRow { + t.Helper() + cols := s.Snapshot(time.Time{}, time.Time{}) + if len(cols) == 0 || len(cols[0].Rows) == 0 { + t.Fatal("no rows after flush") + } + rows := cols[0].Rows + for i := 1; i < len(rows); i++ { + if !bytesLE(rows[i-1].Start, rows[i].Start) { + t.Fatalf("rows not sorted: rows[%d].Start=%q > rows[%d].Start=%q", + i-1, rows[i-1].Start, i, rows[i].Start) + } + } + return rows +} + +// TestSnapshotReturnsDeepCopy guards the public-API contract that the +// Snapshot result is fully owned by the caller: mutating row bounds or +// member-route slices must not corrupt later snapshots, and must not +// race with concurrent Flush/RegisterRoute. +func TestSnapshotReturnsDeepCopy(t *testing.T) { + t.Parallel() + s, _ := newTestSampler(t, MemSamplerOptions{Step: time.Second, HistoryColumns: 4}) + if !s.RegisterRoute(1, []byte("aaaa"), []byte("bbbb")) { + t.Fatal("RegisterRoute(1) returned false") + } + s.Observe(1, OpRead, 1, 2) + s.Flush() + + first := s.Snapshot(time.Time{}, time.Time{}) + if len(first) == 0 || len(first[0].Rows) == 0 { + t.Fatalf("expected at least one row in first snapshot, got %+v", first) + } + first[0].Rows[0].Start[0] = 'X' + first[0].Rows[0].End[0] = 'Y' + first[0].Rows = nil + + second := s.Snapshot(time.Time{}, time.Time{}) + if len(second) == 0 || len(second[0].Rows) == 0 { + t.Fatalf("second snapshot lost rows after caller mutation: %+v", second) + } + r := second[0].Rows[0] + if string(r.Start) != "aaaa" || string(r.End) != "bbbb" { + t.Fatalf("snapshot bounds aliased live state: start=%q end=%q", r.Start, r.End) + } +} + +// TestVirtualBucketRouteIDIsSynthetic pins Codex round-11 P2: a +// virtual bucket must not stamp itself with a real route ID, or that +// real ID could later show up on TWO rows in the same column — +// once aggregate, once individual — when the original folded route +// is later re-registered as an individual slot. +func TestVirtualBucketRouteIDIsSynthetic(t *testing.T) { + t.Parallel() + s, _ := newTestSampler(t, MemSamplerOptions{ + Step: time.Second, + HistoryColumns: 4, + MaxTrackedRoutes: 1, + }) + mustRegister(t, s, 1, "a", "b") + if s.RegisterRoute(2, []byte("c"), []byte("d")) { + t.Fatal("route 2 should fold into a fresh virtual bucket") + } + s.Observe(2, OpRead, 0, 0) + s.Flush() + + rows := lastSnapshotColumn(t, s).Rows + agg := findAggregateRow(t, rows) + if agg.RouteID == 2 { + t.Fatalf("aggregate bucket reused real RouteID 2 — synthetic ID space required: %v", agg) + } + for _, r := range rows { + if !r.Aggregate && r.RouteID == agg.RouteID { + t.Fatalf("real RouteID %d collided with aggregate row %v", r.RouteID, agg) + } + } +} + +// TestRejoinAsIndividualLetsBucketPruneFire pins Codex round-11 P2: +// when a removed virtual member rejoins as an individual slot, the +// deferred member-prune for the old bucket must still execute — +// otherwise the bucket's MemberRoutes keeps a route that no longer +// contributes traffic to it. Set up the bucket with two members so +// removing one leaves the bucket alive (not orphaned), and traffic +// continues to be drained across the grace boundary; the post-grace +// row must show the prune actually happened. +func TestRejoinAsIndividualLetsBucketPruneFire(t *testing.T) { + t.Parallel() + s, clk := newTestSampler(t, MemSamplerOptions{ + Step: time.Second, + HistoryColumns: 8, + MaxTrackedRoutes: 1, + }) + mustRegister(t, s, 1, "a", "b") + if s.RegisterRoute(2, []byte("m"), []byte("n")) { + t.Fatal("route 2 should fold") + } + if s.RegisterRoute(3, []byte("y"), []byte("z")) { + t.Fatal("route 3 should fold (same bucket)") + } + s.Observe(2, OpRead, 0, 0) + s.Observe(3, OpRead, 0, 0) + // Free capacity (route 1) and remove the virtual member (route 3), + // which queues a prune against the bucket. Route 2 still keeps the + // bucket alive in virtualForRoute. + s.RemoveRoute(1) + s.RemoveRoute(3) + if !s.RegisterRoute(3, []byte("y"), []byte("z")) { + t.Fatal("route 3 should fit individually now") + } + + // Two flushes after grace: the first executes the prune, the + // second emits a row with the post-prune MemberRoutes. + clk.Advance(s.graceWindow() + time.Second) + s.Observe(2, OpRead, 0, 0) + s.Flush() + s.Observe(2, OpRead, 0, 0) + s.Flush() + + rows := lastSnapshotColumn(t, s).Rows + agg := findAggregateRow(t, rows) + if memberRoutesContain(agg.MemberRoutes, 3) { + t.Fatalf("bucket still lists pruned route 3: %v", agg.MemberRoutes) + } + if !memberRoutesContain(agg.MemberRoutes, 2) { + t.Fatalf("bucket dropped still-active route 2: %v", agg.MemberRoutes) + } +} + +// TestReRegisterIndividualReusesRetiredSlot pins Codex round-12 P2: +// re-registering the same RouteID inside the grace window must reuse +// the retired slot. Otherwise Flush emits two rows for the same +// RouteID in one column (live + retired drain), splitting counts. +func TestReRegisterIndividualReusesRetiredSlot(t *testing.T) { + t.Parallel() + s, _ := newTestSampler(t, MemSamplerOptions{ + Step: time.Second, + HistoryColumns: 4, + }) + mustRegister(t, s, 1, "a", "b") + s.Observe(1, OpRead, 0, 0) + s.RemoveRoute(1) + mustRegister(t, s, 1, "a", "b") + s.Observe(1, OpRead, 0, 0) + s.Flush() + + rows := lastSnapshotColumn(t, s).Rows + count := 0 + var total uint64 + for _, r := range rows { + if r.RouteID == 1 { + count++ + total += r.Reads + } + } + if count != 1 { + t.Fatalf("got %d rows for RouteID 1 in one column; expected 1 — retired slot was not reclaimed", count) + } + if total != 2 { + t.Fatalf("total Reads for RouteID 1 = %d, want 2 (1 pre-remove + 1 post-rejoin)", total) + } +} + +// TestReRegisterDuringPruneGraceCancelsPrune pins Codex round-9 P2: +// a virtual-member route that gets removed and re-registered inside +// the prune grace window must not have its routeID stripped from +// bucket.MemberRoutes when grace expires — the route is alive again +// and Observe is feeding fresh traffic into the bucket. +func TestReRegisterDuringPruneGraceCancelsPrune(t *testing.T) { + t.Parallel() + s, clk := newTestSampler(t, MemSamplerOptions{ + Step: time.Second, + HistoryColumns: 4, + MaxTrackedRoutes: 1, + }) + mustRegister(t, s, 1, "a", "b") + if s.RegisterRoute(2, []byte("c"), []byte("d")) { + t.Fatal("route 2 should fold into virtual bucket") + } + s.RemoveRoute(2) + if s.RegisterRoute(2, []byte("c"), []byte("d")) { + t.Fatal("route 2 should still fold (over budget)") + } + + clk.Advance(s.graceWindow() + time.Second) + s.Observe(2, OpRead, 0, 0) + s.Flush() + + rows := lastSnapshotColumn(t, s).Rows + agg := findAggregateRow(t, rows) + if !memberRoutesContain(agg.MemberRoutes, 2) { + t.Fatalf("re-registered route 2 dropped from MemberRoutes after grace: %v", agg.MemberRoutes) + } + count := 0 + for _, m := range agg.MemberRoutes { + if m == 2 { + count++ + } + } + if count != 1 { + t.Fatalf("MemberRoutes contains route 2 %d times, want exactly 1: %v", count, agg.MemberRoutes) + } +} + +// TestNonPositiveOptionsFallBackToDefaults pins Codex round-8 P2: a +// negative MaxTrackedRoutes used to bypass the zero-check and force +// every route into a virtual bucket. Confirm both zero and negative +// inputs land on the documented defaults so a bad CLI/env value +// doesn't silently destroy keyviz fidelity. +func TestNonPositiveOptionsFallBackToDefaults(t *testing.T) { + t.Parallel() + for _, val := range []int{0, -1, -10_000} { + s, _ := newTestSampler(t, MemSamplerOptions{ + Step: time.Second, + HistoryColumns: val, + MaxTrackedRoutes: val, + MaxMemberRoutesPerSlot: val, + }) + if s.opts.HistoryColumns != DefaultHistoryColumns { + t.Fatalf("HistoryColumns=%d → %d, want default %d", val, s.opts.HistoryColumns, DefaultHistoryColumns) + } + if s.opts.MaxTrackedRoutes != DefaultMaxTrackedRoutes { + t.Fatalf("MaxTrackedRoutes=%d → %d, want default %d", val, s.opts.MaxTrackedRoutes, DefaultMaxTrackedRoutes) + } + if s.opts.MaxMemberRoutesPerSlot != DefaultMaxMemberRoutesPerSlot { + t.Fatalf("MaxMemberRoutesPerSlot=%d → %d, want default %d", val, s.opts.MaxMemberRoutesPerSlot, DefaultMaxMemberRoutesPerSlot) + } + } +} + +// TestStepAccessor pins the Step() accessor contract: returns the +// configured Step (after defaulting) for a constructed sampler, and +// returns DefaultStep for a typed nil so callers wiring RunFlusher +// against a disabled sampler don't crash. +func TestStepAccessor(t *testing.T) { + t.Parallel() + s, _ := newTestSampler(t, MemSamplerOptions{Step: 250 * time.Millisecond}) + if got := s.Step(); got != 250*time.Millisecond { + t.Fatalf("Step() = %v, want 250ms", got) + } + var nilSampler *MemSampler + if got := nilSampler.Step(); got != DefaultStep { + t.Fatalf("nil Step() = %v, want DefaultStep %v", got, DefaultStep) + } +} + +// TestMemberRoutesCappedAtConfiguredCap pins Codex round-7 P2: per- +// bucket MemberRoutes growth is bounded by MaxMemberRoutesPerSlot, so +// flushed columns don't scale with total folded routes when the +// deployment route count vastly exceeds MaxTrackedRoutes. +func TestMemberRoutesCappedAtConfiguredCap(t *testing.T) { + t.Parallel() + s, _ := newTestSampler(t, MemSamplerOptions{ + Step: time.Second, + HistoryColumns: 4, + MaxTrackedRoutes: 1, + MaxMemberRoutesPerSlot: 3, + }) + mustRegister(t, s, 1, "a", "b") + for i := uint64(2); i < 10; i++ { + key := []byte{byte('a' + i)} + if s.RegisterRoute(i, key, append(key, 'z')) { + t.Fatalf("route %d should fold (over budget)", i) + } + s.Observe(i, OpRead, 1, 0) + } + s.Flush() + rows := lastSnapshotColumn(t, s).Rows + agg := findAggregateRow(t, rows) + if len(agg.MemberRoutes) > 3 { + t.Fatalf("MemberRoutes exceeds cap=3: %v", agg.MemberRoutes) + } + // All 8 over-budget routes still drove the bucket counters even + // though only the first 3 are recorded as members. + if agg.Reads != 8 { + t.Fatalf("bucket Reads = %d, want 8 (counters must absorb traffic past cap)", agg.Reads) + } +} + +// TestRetiredTailClearedAfterDrop pins Codex round-7 P2: after a +// retired slot's grace expires and drainRetiredSlots drops the entry, +// the *routeSlot pointer in the dropped tail of the backing array +// must be zeroed so it doesn't keep the slot GC-reachable through +// the reused capacity. +func TestRetiredTailClearedAfterDrop(t *testing.T) { + t.Parallel() + s, clk := newTestSampler(t, MemSamplerOptions{Step: time.Second, HistoryColumns: 4}) + mustRegister(t, s, 1, "a", "b") + s.Observe(1, OpRead, 1, 1) + s.RemoveRoute(1) + + s.retiredMu.Lock() + if len(s.retiredSlots) != 1 { + s.retiredMu.Unlock() + t.Fatalf("expected 1 retired slot pre-flush, got %d", len(s.retiredSlots)) + } + orig := s.retiredSlots[:1:1] // slice header pinning index 0 of the backing array + s.retiredMu.Unlock() + + clk.Advance(s.graceWindow() + time.Second) + s.Flush() + + s.retiredMu.Lock() + leftover := len(s.retiredSlots) + s.retiredMu.Unlock() + if leftover != 0 { + t.Fatalf("expected drain to drop entry, len=%d", leftover) + } + if orig[0].slot != nil { + t.Fatal("dropped retiredSlot.slot pointer not zeroed in backing array — GC retention leak") + } +} + +// BenchmarkObserveHit pins the hot-path properties claimed in the +// package doc: a single atomic.Pointer.Load, a map lookup, and at +// most two atomic.AddUint64 calls — no allocation, no mutex. Use +// `go test -bench=BenchmarkObserveHit -benchmem ./keyviz/...` to +// catch regressions before they reach the coordinator wiring PR. +func BenchmarkObserveHit(b *testing.B) { + s := NewMemSampler(MemSamplerOptions{Step: time.Second, HistoryColumns: 4}) + if !s.RegisterRoute(1, []byte("a"), []byte("b")) { + b.Fatal("RegisterRoute(1) returned false") + } + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + s.Observe(1, OpRead, 16, 64) + } +} + +// BenchmarkObserveMiss exercises the unknown-route path so a future +// regression that grows allocations on misses (e.g. virtualForRoute +// fallback path) is caught. +func BenchmarkObserveMiss(b *testing.B) { + s := NewMemSampler(MemSamplerOptions{Step: time.Second, HistoryColumns: 4}) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + s.Observe(99, OpRead, 16, 64) + } +}