diff --git a/keyviz/sampler_test.go b/keyviz/sampler_test.go index da70424d6..f1ed28f92 100644 --- a/keyviz/sampler_test.go +++ b/keyviz/sampler_test.go @@ -1093,3 +1093,253 @@ func BenchmarkObserveMiss(b *testing.B) { s.Observe(99, OpRead, 16, 64) } } + +// BenchmarkObserveParallel pins the contention profile of the hot +// path. Each goroutine is shard-pinned to a disjoint range of route +// IDs so per-slot atomic adds genuinely never contend across +// goroutines and the only shared work is the atomic.Pointer load of +// the route table. A regression that adds a shared mutex (or a +// global counter) on the hot path will show up as a sharp drop in +// ns/op as parallelism rises. +// +// numRoutes is sized for numRoutes / routesPerShard >= 64 +// disjoint shards so the benchmark stays meaningful up to +// GOMAXPROCS = 64. The previous draft (numRoutes = 64, +// routesPerShard = 4) only had 16 shards and silently regressed +// to shared-counter contention on bigger CI runners — the very +// regression class this benchmark exists to detect (Claude bot +// round-2 P2 on PR #682). +func BenchmarkObserveParallel(b *testing.B) { + const ( + numRoutes = 256 + routesPerShard = 4 + ) + s := NewMemSampler(MemSamplerOptions{Step: time.Second, HistoryColumns: 4, MaxTrackedRoutes: numRoutes}) + for r := uint64(1); r <= numRoutes; r++ { + if !s.RegisterRoute(r, []byte{byte(r >> 8), byte(r)}, []byte{byte((r + 1) >> 8), byte(r + 1)}) { + b.Fatalf("RegisterRoute(%d) returned false", r) + } + } + var nextShard atomic.Uint64 + b.ReportAllocs() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + shardIndex := nextShard.Add(1) - 1 + shardBase := (shardIndex * routesPerShard) % numRoutes + var i uint64 + for pb.Next() { + s.Observe(shardBase+(i%routesPerShard)+1, OpWrite, 16, 64) + i++ + } + }) +} + +// BenchmarkRegisterRoute pins the route-mutation path: each call +// takes routesMu, deep-copies the immutable routeTable, mutates, and +// republishes via atomic.Store. The benchmark holds a fixed-size +// table (registerBenchTableSize routes pre-loaded) and toggles a +// route in/out on each iteration so b.N controls the iteration count +// only — not the table size. The earlier draft made b.N drive both +// the iteration count and the MaxTrackedRoutes cap, which produced an +// O(N²) total cost and made ns/op (which Go derives by dividing total +// time by b.N) read as growing-with-N even though per-call cost was +// constant (Gemini medium on PR #682). +func BenchmarkRegisterRoute(b *testing.B) { + const registerBenchTableSize = 1024 + s := NewMemSampler(MemSamplerOptions{ + Step: time.Second, + HistoryColumns: 4, + MaxTrackedRoutes: registerBenchTableSize + 1, + }) + for r := uint64(1); r <= registerBenchTableSize; r++ { + if !s.RegisterRoute(r, []byte{byte(r >> 8), byte(r)}, []byte{byte((r + 1) >> 8), byte(r + 1)}) { + b.Fatalf("seed RegisterRoute(%d) returned false", r) + } + } + churnID := uint64(registerBenchTableSize + 1) + startKey := []byte{byte(churnID >> 8), byte(churnID & 0xFF)} + endKey := []byte{byte((churnID + 1) >> 8), byte((churnID + 1) & 0xFF)} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + s.RemoveRoute(churnID) + if !s.RegisterRoute(churnID, startKey, endKey) { + b.Fatalf("RegisterRoute(%d) returned false at i=%d", churnID, i) + } + } +} + +// BenchmarkFlush pins the per-step drain cost. Flush walks every +// live slot, atomic.SwapUint64s its four counters, and pushes a new +// MatrixColumn into the ring buffer. The hot path Observe must not +// race with this drain (atomic-only for both sides), but Flush itself +// scales with the live route count — pin its cost so we notice if +// a future change adds e.g. an O(N²) slot scan. +// +// b.StopTimer brackets the per-iteration reseed loop so the reported +// ns/op reflects the Flush cost only. Including the 1024 Observe +// calls in the timed range inflated the number by ~25% and let a +// real Flush regression hide behind Observe variance (Gemini medium +// on PR #682). +func BenchmarkFlush(b *testing.B) { + const numRoutes = 1024 + clk := &fakeClock{now: time.Unix(1_700_000_000, 0)} + s := NewMemSampler(MemSamplerOptions{ + Step: time.Second, + HistoryColumns: 16, + MaxTrackedRoutes: numRoutes, + Now: clk.Now, + }) + for r := uint64(1); r <= numRoutes; r++ { + if !s.RegisterRoute(r, []byte{byte(r >> 8), byte(r)}, []byte{byte((r + 1) >> 8), byte(r + 1)}) { + b.Fatalf("RegisterRoute(%d) returned false", r) + } + // Pre-seed every slot with traffic so the first Flush has work to swap. + s.Observe(r, OpWrite, 16, 64) + } + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + s.Flush() + b.StopTimer() + clk.Advance(time.Second) + // Reseed so the next Flush still has counters to drain. + for r := uint64(1); r <= numRoutes; r++ { + s.Observe(r, OpWrite, 16, 64) + } + b.StartTimer() + } +} + +// BenchmarkSnapshot pins the read-side pivot cost. Snapshot copies +// every column in the requested window into freshly allocated +// MatrixRows so the caller can freely mutate the result. With +// numRoutes×numColumns cells the pivot is O(N×C) and its cost is the +// dominant term in the admin handler's response latency. +func BenchmarkSnapshot(b *testing.B) { + const ( + numRoutes = 1024 + numColumns = 64 + ) + clk := &fakeClock{now: time.Unix(1_700_000_000, 0)} + s := NewMemSampler(MemSamplerOptions{ + Step: time.Second, + HistoryColumns: numColumns, + MaxTrackedRoutes: numRoutes, + Now: clk.Now, + }) + for r := uint64(1); r <= numRoutes; r++ { + if !s.RegisterRoute(r, []byte{byte(r >> 8), byte(r)}, []byte{byte((r + 1) >> 8), byte(r + 1)}) { + b.Fatalf("RegisterRoute(%d) returned false", r) + } + } + for c := 0; c < numColumns; c++ { + for r := uint64(1); r <= numRoutes; r++ { + s.Observe(r, OpWrite, 16, 64) + } + s.Flush() + clk.Advance(time.Second) + } + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = s.Snapshot(time.Time{}, time.Time{}) + } +} + +// TestObserveExactCountUnderConcurrentBurst pins the Phase 2-A +// "no counts lost" invariant under the kind of workload §5.2 calls +// out as the SLO target: many goroutines hammering many routes +// simultaneously. With sub-sampling not yet implemented (sampleRate +// = 1 always), every Observe must be reflected exactly in the +// post-Flush Snapshot. When sub-sampling lands and this invariant +// no longer holds, this test must be updated alongside the new +// estimator-based ±5% / 95%-CI assertion described in §5.2. +func TestObserveExactCountUnderConcurrentBurst(t *testing.T) { + t.Parallel() + const ( + numRoutes = 32 + writersPerRoute = 8 + opsPerWriter = 4_000 + keyLen = 16 + valueLen = 64 + ) + s, clk := newTestSampler(t, MemSamplerOptions{ + Step: time.Second, + HistoryColumns: 8, + MaxTrackedRoutes: numRoutes, + }) + for r := uint64(1); r <= numRoutes; r++ { + if !s.RegisterRoute(r, []byte{byte(r)}, []byte{byte(r) + 1}) { + t.Fatalf("RegisterRoute(%d) returned false", r) + } + } + + runConcurrentBurst(s, numRoutes, writersPerRoute, opsPerWriter, keyLen, valueLen) + clk.Advance(time.Second) + s.Flush() + + cols := s.Snapshot(time.Time{}, time.Time{}) + if len(cols) != 1 { + t.Fatalf("expected 1 column after a single Flush, got %d", len(cols)) + } + const expectedPerRoute = uint64(writersPerRoute * opsPerWriter) + const expectedBytesPerRoute = expectedPerRoute * uint64(keyLen+valueLen) + + // Codex P1 on PR #682: assert every registered route appears in + // the column. Without this index, a future Flush regression that + // silently drops a route's row would still pass the per-row + // counter checks below — the loop would just iterate fewer times. + rowsByRoute := make(map[uint64]MatrixRow, numRoutes) + for _, row := range cols[0].Rows { + rowsByRoute[row.RouteID] = row + } + for r := uint64(1); r <= numRoutes; r++ { + row, ok := rowsByRoute[r] + if !ok { + t.Errorf("route %d: missing from Snapshot rows; Flush must not silently drop a registered route under burst", r) + continue + } + if row.Writes != expectedPerRoute { + t.Errorf("route %d: writes = %d, want exactly %d (no counts must be lost under concurrent burst)", + row.RouteID, row.Writes, expectedPerRoute) + } + if row.WriteBytes != expectedBytesPerRoute { + t.Errorf("route %d: writeBytes = %d, want exactly %d", + row.RouteID, row.WriteBytes, expectedBytesPerRoute) + } + } +} + +// runConcurrentBurst spawns numRoutes×writersPerRoute goroutines and +// releases them simultaneously so every route sees genuinely +// concurrent Observe traffic. Returns once every writer has finished. +// +// numRoutes is uint64 so the loop iterates in the same type the +// sampler uses for RouteID; the earlier `int` form needed a +// per-iteration `uint64(r)` cast that triggered gosec G115 and a +// nolint suppression CLAUDE.md tells us to refactor instead of +// suppress (Claude bot round-2 P1 on PR #682). +func runConcurrentBurst(s *MemSampler, numRoutes uint64, writersPerRoute, opsPerWriter, keyLen, valueLen int) { + var ready, start, done sync.WaitGroup + total := int(numRoutes) * writersPerRoute + ready.Add(total) + done.Add(total) + start.Add(1) + for r := uint64(1); r <= numRoutes; r++ { + routeID := r + for w := 0; w < writersPerRoute; w++ { + go func() { + defer done.Done() + ready.Done() + start.Wait() + for op := 0; op < opsPerWriter; op++ { + s.Observe(routeID, OpWrite, keyLen, valueLen) + } + }() + } + } + ready.Wait() + start.Done() + done.Wait() +}