-
Notifications
You must be signed in to change notification settings - Fork 2
test(keyviz): Phase 2-A bench coverage + concurrent-burst exact-count test #682
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1093,3 +1093,253 @@ func BenchmarkObserveMiss(b *testing.B) { | |
| s.Observe(99, OpRead, 16, 64) | ||
| } | ||
| } | ||
|
|
||
| // BenchmarkObserveParallel pins the contention profile of the hot | ||
| // path. Each goroutine is shard-pinned to a disjoint range of route | ||
| // IDs so per-slot atomic adds genuinely never contend across | ||
| // goroutines and the only shared work is the atomic.Pointer load of | ||
| // the route table. A regression that adds a shared mutex (or a | ||
| // global counter) on the hot path will show up as a sharp drop in | ||
| // ns/op as parallelism rises. | ||
| // | ||
| // numRoutes is sized for numRoutes / routesPerShard >= 64 | ||
| // disjoint shards so the benchmark stays meaningful up to | ||
| // GOMAXPROCS = 64. The previous draft (numRoutes = 64, | ||
| // routesPerShard = 4) only had 16 shards and silently regressed | ||
| // to shared-counter contention on bigger CI runners — the very | ||
| // regression class this benchmark exists to detect (Claude bot | ||
| // round-2 P2 on PR #682). | ||
| func BenchmarkObserveParallel(b *testing.B) { | ||
| const ( | ||
| numRoutes = 256 | ||
| routesPerShard = 4 | ||
| ) | ||
| s := NewMemSampler(MemSamplerOptions{Step: time.Second, HistoryColumns: 4, MaxTrackedRoutes: numRoutes}) | ||
| for r := uint64(1); r <= numRoutes; r++ { | ||
| if !s.RegisterRoute(r, []byte{byte(r >> 8), byte(r)}, []byte{byte((r + 1) >> 8), byte(r + 1)}) { | ||
| b.Fatalf("RegisterRoute(%d) returned false", r) | ||
| } | ||
| } | ||
| var nextShard atomic.Uint64 | ||
| b.ReportAllocs() | ||
| b.ResetTimer() | ||
| b.RunParallel(func(pb *testing.PB) { | ||
| shardIndex := nextShard.Add(1) - 1 | ||
| shardBase := (shardIndex * routesPerShard) % numRoutes | ||
| var i uint64 | ||
| for pb.Next() { | ||
| s.Observe(shardBase+(i%routesPerShard)+1, OpWrite, 16, 64) | ||
| i++ | ||
| } | ||
| }) | ||
|
Comment on lines
+1126
to
+1134
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The implementation of var gid atomic.Uint64
b.RunParallel(func(pb *testing.PB) {
offset := gid.Add(1)
var i uint64
for pb.Next() {
s.Observe(((offset+i)%numRoutes)+1, OpWrite, 16, 64)
i++
}
})References
|
||
| } | ||
|
|
||
| // BenchmarkRegisterRoute pins the route-mutation path: each call | ||
| // takes routesMu, deep-copies the immutable routeTable, mutates, and | ||
| // republishes via atomic.Store. The benchmark holds a fixed-size | ||
| // table (registerBenchTableSize routes pre-loaded) and toggles a | ||
| // route in/out on each iteration so b.N controls the iteration count | ||
| // only — not the table size. The earlier draft made b.N drive both | ||
| // the iteration count and the MaxTrackedRoutes cap, which produced an | ||
| // O(N²) total cost and made ns/op (which Go derives by dividing total | ||
| // time by b.N) read as growing-with-N even though per-call cost was | ||
| // constant (Gemini medium on PR #682). | ||
| func BenchmarkRegisterRoute(b *testing.B) { | ||
| const registerBenchTableSize = 1024 | ||
| s := NewMemSampler(MemSamplerOptions{ | ||
| Step: time.Second, | ||
| HistoryColumns: 4, | ||
| MaxTrackedRoutes: registerBenchTableSize + 1, | ||
| }) | ||
| for r := uint64(1); r <= registerBenchTableSize; r++ { | ||
| if !s.RegisterRoute(r, []byte{byte(r >> 8), byte(r)}, []byte{byte((r + 1) >> 8), byte(r + 1)}) { | ||
| b.Fatalf("seed RegisterRoute(%d) returned false", r) | ||
| } | ||
| } | ||
| churnID := uint64(registerBenchTableSize + 1) | ||
| startKey := []byte{byte(churnID >> 8), byte(churnID & 0xFF)} | ||
| endKey := []byte{byte((churnID + 1) >> 8), byte((churnID + 1) & 0xFF)} | ||
| b.ReportAllocs() | ||
| b.ResetTimer() | ||
| for i := 0; i < b.N; i++ { | ||
| s.RemoveRoute(churnID) | ||
| if !s.RegisterRoute(churnID, startKey, endKey) { | ||
| b.Fatalf("RegisterRoute(%d) returned false at i=%d", churnID, i) | ||
| } | ||
| } | ||
| } | ||
|
Comment on lines
+1147
to
+1170
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This benchmark has |
||
|
|
||
| // BenchmarkFlush pins the per-step drain cost. Flush walks every | ||
| // live slot, atomic.SwapUint64s its four counters, and pushes a new | ||
| // MatrixColumn into the ring buffer. The hot path Observe must not | ||
| // race with this drain (atomic-only for both sides), but Flush itself | ||
| // scales with the live route count — pin its cost so we notice if | ||
| // a future change adds e.g. an O(N²) slot scan. | ||
| // | ||
| // b.StopTimer brackets the per-iteration reseed loop so the reported | ||
| // ns/op reflects the Flush cost only. Including the 1024 Observe | ||
| // calls in the timed range inflated the number by ~25% and let a | ||
| // real Flush regression hide behind Observe variance (Gemini medium | ||
| // on PR #682). | ||
| func BenchmarkFlush(b *testing.B) { | ||
| const numRoutes = 1024 | ||
| clk := &fakeClock{now: time.Unix(1_700_000_000, 0)} | ||
| s := NewMemSampler(MemSamplerOptions{ | ||
| Step: time.Second, | ||
| HistoryColumns: 16, | ||
| MaxTrackedRoutes: numRoutes, | ||
| Now: clk.Now, | ||
| }) | ||
| for r := uint64(1); r <= numRoutes; r++ { | ||
| if !s.RegisterRoute(r, []byte{byte(r >> 8), byte(r)}, []byte{byte((r + 1) >> 8), byte(r + 1)}) { | ||
| b.Fatalf("RegisterRoute(%d) returned false", r) | ||
| } | ||
| // Pre-seed every slot with traffic so the first Flush has work to swap. | ||
| s.Observe(r, OpWrite, 16, 64) | ||
| } | ||
| b.ReportAllocs() | ||
| b.ResetTimer() | ||
| for i := 0; i < b.N; i++ { | ||
| s.Flush() | ||
| b.StopTimer() | ||
| clk.Advance(time.Second) | ||
| // Reseed so the next Flush still has counters to drain. | ||
| for r := uint64(1); r <= numRoutes; r++ { | ||
| s.Observe(r, OpWrite, 16, 64) | ||
| } | ||
| b.StartTimer() | ||
| } | ||
|
Comment on lines
+1202
to
+1211
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The for i := 0; i < b.N; i++ {
s.Flush()
clk.Advance(time.Second)
b.StopTimer()
// Reseed so the next Flush still has counters to drain.
for r := uint64(1); r <= numRoutes; r++ {
s.Observe(r, OpWrite, 16, 64)
}
b.StartTimer()
} |
||
| } | ||
|
|
||
| // BenchmarkSnapshot pins the read-side pivot cost. Snapshot copies | ||
| // every column in the requested window into freshly allocated | ||
| // MatrixRows so the caller can freely mutate the result. With | ||
| // numRoutes×numColumns cells the pivot is O(N×C) and its cost is the | ||
| // dominant term in the admin handler's response latency. | ||
| func BenchmarkSnapshot(b *testing.B) { | ||
| const ( | ||
| numRoutes = 1024 | ||
| numColumns = 64 | ||
| ) | ||
| clk := &fakeClock{now: time.Unix(1_700_000_000, 0)} | ||
| s := NewMemSampler(MemSamplerOptions{ | ||
| Step: time.Second, | ||
| HistoryColumns: numColumns, | ||
| MaxTrackedRoutes: numRoutes, | ||
| Now: clk.Now, | ||
| }) | ||
| for r := uint64(1); r <= numRoutes; r++ { | ||
| if !s.RegisterRoute(r, []byte{byte(r >> 8), byte(r)}, []byte{byte((r + 1) >> 8), byte(r + 1)}) { | ||
| b.Fatalf("RegisterRoute(%d) returned false", r) | ||
| } | ||
| } | ||
| for c := 0; c < numColumns; c++ { | ||
| for r := uint64(1); r <= numRoutes; r++ { | ||
| s.Observe(r, OpWrite, 16, 64) | ||
| } | ||
| s.Flush() | ||
| clk.Advance(time.Second) | ||
| } | ||
| b.ReportAllocs() | ||
| b.ResetTimer() | ||
| for i := 0; i < b.N; i++ { | ||
| _ = s.Snapshot(time.Time{}, time.Time{}) | ||
| } | ||
| } | ||
|
|
||
| // TestObserveExactCountUnderConcurrentBurst pins the Phase 2-A | ||
| // "no counts lost" invariant under the kind of workload §5.2 calls | ||
| // out as the SLO target: many goroutines hammering many routes | ||
| // simultaneously. With sub-sampling not yet implemented (sampleRate | ||
| // = 1 always), every Observe must be reflected exactly in the | ||
| // post-Flush Snapshot. When sub-sampling lands and this invariant | ||
| // no longer holds, this test must be updated alongside the new | ||
| // estimator-based ±5% / 95%-CI assertion described in §5.2. | ||
| func TestObserveExactCountUnderConcurrentBurst(t *testing.T) { | ||
| t.Parallel() | ||
| const ( | ||
| numRoutes = 32 | ||
| writersPerRoute = 8 | ||
| opsPerWriter = 4_000 | ||
| keyLen = 16 | ||
| valueLen = 64 | ||
| ) | ||
| s, clk := newTestSampler(t, MemSamplerOptions{ | ||
| Step: time.Second, | ||
| HistoryColumns: 8, | ||
| MaxTrackedRoutes: numRoutes, | ||
| }) | ||
| for r := uint64(1); r <= numRoutes; r++ { | ||
| if !s.RegisterRoute(r, []byte{byte(r)}, []byte{byte(r) + 1}) { | ||
| t.Fatalf("RegisterRoute(%d) returned false", r) | ||
| } | ||
| } | ||
|
|
||
| runConcurrentBurst(s, numRoutes, writersPerRoute, opsPerWriter, keyLen, valueLen) | ||
| clk.Advance(time.Second) | ||
| s.Flush() | ||
|
|
||
| cols := s.Snapshot(time.Time{}, time.Time{}) | ||
| if len(cols) != 1 { | ||
| t.Fatalf("expected 1 column after a single Flush, got %d", len(cols)) | ||
| } | ||
| const expectedPerRoute = uint64(writersPerRoute * opsPerWriter) | ||
| const expectedBytesPerRoute = expectedPerRoute * uint64(keyLen+valueLen) | ||
|
|
||
| // Codex P1 on PR #682: assert every registered route appears in | ||
| // the column. Without this index, a future Flush regression that | ||
| // silently drops a route's row would still pass the per-row | ||
| // counter checks below — the loop would just iterate fewer times. | ||
| rowsByRoute := make(map[uint64]MatrixRow, numRoutes) | ||
| for _, row := range cols[0].Rows { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This test only validates rows that appear in Useful? React with 👍 / 👎. |
||
| rowsByRoute[row.RouteID] = row | ||
| } | ||
| for r := uint64(1); r <= numRoutes; r++ { | ||
| row, ok := rowsByRoute[r] | ||
| if !ok { | ||
| t.Errorf("route %d: missing from Snapshot rows; Flush must not silently drop a registered route under burst", r) | ||
| continue | ||
| } | ||
| if row.Writes != expectedPerRoute { | ||
| t.Errorf("route %d: writes = %d, want exactly %d (no counts must be lost under concurrent burst)", | ||
| row.RouteID, row.Writes, expectedPerRoute) | ||
| } | ||
| if row.WriteBytes != expectedBytesPerRoute { | ||
| t.Errorf("route %d: writeBytes = %d, want exactly %d", | ||
| row.RouteID, row.WriteBytes, expectedBytesPerRoute) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // runConcurrentBurst spawns numRoutes×writersPerRoute goroutines and | ||
| // releases them simultaneously so every route sees genuinely | ||
| // concurrent Observe traffic. Returns once every writer has finished. | ||
| // | ||
| // numRoutes is uint64 so the loop iterates in the same type the | ||
| // sampler uses for RouteID; the earlier `int` form needed a | ||
| // per-iteration `uint64(r)` cast that triggered gosec G115 and a | ||
| // nolint suppression CLAUDE.md tells us to refactor instead of | ||
| // suppress (Claude bot round-2 P1 on PR #682). | ||
| func runConcurrentBurst(s *MemSampler, numRoutes uint64, writersPerRoute, opsPerWriter, keyLen, valueLen int) { | ||
| var ready, start, done sync.WaitGroup | ||
| total := int(numRoutes) * writersPerRoute | ||
| ready.Add(total) | ||
| done.Add(total) | ||
| start.Add(1) | ||
| for r := uint64(1); r <= numRoutes; r++ { | ||
| routeID := r | ||
| for w := 0; w < writersPerRoute; w++ { | ||
| go func() { | ||
| defer done.Done() | ||
| ready.Done() | ||
| start.Wait() | ||
| for op := 0; op < opsPerWriter; op++ { | ||
| s.Observe(routeID, OpWrite, keyLen, valueLen) | ||
| } | ||
| }() | ||
| } | ||
| } | ||
| ready.Wait() | ||
| start.Done() | ||
| done.Wait() | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BenchmarkObserveParallelstill reintroduces cross-goroutine counter contention whenRunParallellaunches more than 16 workers (its default isGOMAXPROCS): withnumRoutes=64androutesPerShard=4,shardBase := (shardIndex * routesPerShard) % numRouteswraps and assigns multiple workers to the same 4-route shard. In that environment, the benchmark no longer isolates the sharedtable.Loadcost it claims to pin, so ns/op becomes host-core-count dependent and can mask/blur hot-path regressions.Useful? React with 👍 / 👎.