Skip to content

Commit 3f35b68

Browse files
authored
test(keyviz): Phase 2-A bench coverage + concurrent-burst exact-count test (#682)
## Summary Closes the "Benchmark gate green" half of the Phase 2-A exit criteria in `docs/admin_ui_key_visualizer_design.md` §12. Pure-test addition; no production code changes. Four new benchmarks pin the cost shape of the `keyviz` package: | Benchmark | What it pins | Apple M1 Max @ 200 ms | |---|---|---| | `BenchmarkObserveParallel` | Hot-path contention — distinct slot per goroutine, only shared work is the table `atomic.Pointer.Load` | 20.6 ns/op, 0 allocs | | `BenchmarkRegisterRoute` | COW route-table mutation cost; flat ns/op would signal a shift to a shared mutable map | 123.9 µs/op, 24 allocs | | `BenchmarkFlush` | Per-step drain over 1024 live routes; catches O(N²) scan regressions | 96.4 µs/op | | `BenchmarkSnapshot` | Read-side pivot over 1024 routes × 64 columns; dominant term in admin handler latency | 1.86 ms/op | `TestObserveExactCountUnderConcurrentBurst` — 32 routes × 8 writers × 4000 ops gated on a `sync.WaitGroup` so every goroutine starts simultaneously. Pins the Phase 2-A invariant that no counts are lost under concurrent burst: `sampleRate = 1` today, so every `Observe` must be reflected exactly in the post-Flush Snapshot. When the design §5.2 sub-sampling estimator lands, this test must be updated alongside the new ±5% / 95%-CI assertion. ## Why not the ±5% accuracy SLO test? Per design §5.2, the SLO is for the Horvitz–Thompson estimator that runs when the adaptive controller raises `sampleRate` above 1 under burst. **That controller is not yet implemented** — the current code path always counts every Observe — so a ±5% / 95%-CI assertion would be testing the absence of a feature. This PR pins the current "exact counting" invariant instead; the SLO test lands with the sub-sampling implementation in a follow-up that updates §5.2 to reflect the chosen controller shape. ## Five-lens self-review 1. **Data loss** — n/a; tests only. 2. **Concurrency / distributed** — the burst test is the concurrency check; with race detector on (`go test -race`) it's clean. 3. **Performance** — adds CI-runnable benchmarks; baseline ns/op recorded above. 4. **Data consistency** — pins the exact-counting invariant explicitly so a regression that loses counts on the hot path is caught. 5. **Test coverage** — the burst test plus four benchmarks are the new coverage. Existing tests unchanged. ## Test plan - [x] `go test -race -count=1 ./keyviz/...` — clean - [x] `go test -bench='Benchmark(ObserveParallel|RegisterRoute|Flush|Snapshot)$' -benchtime=200ms -run='^$' ./keyviz/...` — runs and reports the numbers above - [x] `golangci-lint run ./keyviz/...` — clean <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **Tests** * Added performance benchmarks for concurrent operations, route registration, flush operations, and snapshot functionality * Added concurrency correctness test to verify data integrity and accurate counting under concurrent load <!-- end of auto-generated comment: release notes by coderabbit.ai -->
2 parents df5ea5e + 3310a1a commit 3f35b68

1 file changed

Lines changed: 250 additions & 0 deletions

File tree

keyviz/sampler_test.go

Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1093,3 +1093,253 @@ func BenchmarkObserveMiss(b *testing.B) {
10931093
s.Observe(99, OpRead, 16, 64)
10941094
}
10951095
}
1096+
1097+
// BenchmarkObserveParallel pins the contention profile of the hot
1098+
// path. Each goroutine is shard-pinned to a disjoint range of route
1099+
// IDs so per-slot atomic adds genuinely never contend across
1100+
// goroutines and the only shared work is the atomic.Pointer load of
1101+
// the route table. A regression that adds a shared mutex (or a
1102+
// global counter) on the hot path will show up as a sharp drop in
1103+
// ns/op as parallelism rises.
1104+
//
1105+
// numRoutes is sized for numRoutes / routesPerShard >= 64
1106+
// disjoint shards so the benchmark stays meaningful up to
1107+
// GOMAXPROCS = 64. The previous draft (numRoutes = 64,
1108+
// routesPerShard = 4) only had 16 shards and silently regressed
1109+
// to shared-counter contention on bigger CI runners — the very
1110+
// regression class this benchmark exists to detect (Claude bot
1111+
// round-2 P2 on PR #682).
1112+
func BenchmarkObserveParallel(b *testing.B) {
1113+
const (
1114+
numRoutes = 256
1115+
routesPerShard = 4
1116+
)
1117+
s := NewMemSampler(MemSamplerOptions{Step: time.Second, HistoryColumns: 4, MaxTrackedRoutes: numRoutes})
1118+
for r := uint64(1); r <= numRoutes; r++ {
1119+
if !s.RegisterRoute(r, []byte{byte(r >> 8), byte(r)}, []byte{byte((r + 1) >> 8), byte(r + 1)}) {
1120+
b.Fatalf("RegisterRoute(%d) returned false", r)
1121+
}
1122+
}
1123+
var nextShard atomic.Uint64
1124+
b.ReportAllocs()
1125+
b.ResetTimer()
1126+
b.RunParallel(func(pb *testing.PB) {
1127+
shardIndex := nextShard.Add(1) - 1
1128+
shardBase := (shardIndex * routesPerShard) % numRoutes
1129+
var i uint64
1130+
for pb.Next() {
1131+
s.Observe(shardBase+(i%routesPerShard)+1, OpWrite, 16, 64)
1132+
i++
1133+
}
1134+
})
1135+
}
1136+
1137+
// BenchmarkRegisterRoute pins the route-mutation path: each call
1138+
// takes routesMu, deep-copies the immutable routeTable, mutates, and
1139+
// republishes via atomic.Store. The benchmark holds a fixed-size
1140+
// table (registerBenchTableSize routes pre-loaded) and toggles a
1141+
// route in/out on each iteration so b.N controls the iteration count
1142+
// only — not the table size. The earlier draft made b.N drive both
1143+
// the iteration count and the MaxTrackedRoutes cap, which produced an
1144+
// O(N²) total cost and made ns/op (which Go derives by dividing total
1145+
// time by b.N) read as growing-with-N even though per-call cost was
1146+
// constant (Gemini medium on PR #682).
1147+
func BenchmarkRegisterRoute(b *testing.B) {
1148+
const registerBenchTableSize = 1024
1149+
s := NewMemSampler(MemSamplerOptions{
1150+
Step: time.Second,
1151+
HistoryColumns: 4,
1152+
MaxTrackedRoutes: registerBenchTableSize + 1,
1153+
})
1154+
for r := uint64(1); r <= registerBenchTableSize; r++ {
1155+
if !s.RegisterRoute(r, []byte{byte(r >> 8), byte(r)}, []byte{byte((r + 1) >> 8), byte(r + 1)}) {
1156+
b.Fatalf("seed RegisterRoute(%d) returned false", r)
1157+
}
1158+
}
1159+
churnID := uint64(registerBenchTableSize + 1)
1160+
startKey := []byte{byte(churnID >> 8), byte(churnID & 0xFF)}
1161+
endKey := []byte{byte((churnID + 1) >> 8), byte((churnID + 1) & 0xFF)}
1162+
b.ReportAllocs()
1163+
b.ResetTimer()
1164+
for i := 0; i < b.N; i++ {
1165+
s.RemoveRoute(churnID)
1166+
if !s.RegisterRoute(churnID, startKey, endKey) {
1167+
b.Fatalf("RegisterRoute(%d) returned false at i=%d", churnID, i)
1168+
}
1169+
}
1170+
}
1171+
1172+
// BenchmarkFlush pins the per-step drain cost. Flush walks every
1173+
// live slot, atomic.SwapUint64s its four counters, and pushes a new
1174+
// MatrixColumn into the ring buffer. The hot path Observe must not
1175+
// race with this drain (atomic-only for both sides), but Flush itself
1176+
// scales with the live route count — pin its cost so we notice if
1177+
// a future change adds e.g. an O(N²) slot scan.
1178+
//
1179+
// b.StopTimer brackets the per-iteration reseed loop so the reported
1180+
// ns/op reflects the Flush cost only. Including the 1024 Observe
1181+
// calls in the timed range inflated the number by ~25% and let a
1182+
// real Flush regression hide behind Observe variance (Gemini medium
1183+
// on PR #682).
1184+
func BenchmarkFlush(b *testing.B) {
1185+
const numRoutes = 1024
1186+
clk := &fakeClock{now: time.Unix(1_700_000_000, 0)}
1187+
s := NewMemSampler(MemSamplerOptions{
1188+
Step: time.Second,
1189+
HistoryColumns: 16,
1190+
MaxTrackedRoutes: numRoutes,
1191+
Now: clk.Now,
1192+
})
1193+
for r := uint64(1); r <= numRoutes; r++ {
1194+
if !s.RegisterRoute(r, []byte{byte(r >> 8), byte(r)}, []byte{byte((r + 1) >> 8), byte(r + 1)}) {
1195+
b.Fatalf("RegisterRoute(%d) returned false", r)
1196+
}
1197+
// Pre-seed every slot with traffic so the first Flush has work to swap.
1198+
s.Observe(r, OpWrite, 16, 64)
1199+
}
1200+
b.ReportAllocs()
1201+
b.ResetTimer()
1202+
for i := 0; i < b.N; i++ {
1203+
s.Flush()
1204+
b.StopTimer()
1205+
clk.Advance(time.Second)
1206+
// Reseed so the next Flush still has counters to drain.
1207+
for r := uint64(1); r <= numRoutes; r++ {
1208+
s.Observe(r, OpWrite, 16, 64)
1209+
}
1210+
b.StartTimer()
1211+
}
1212+
}
1213+
1214+
// BenchmarkSnapshot pins the read-side pivot cost. Snapshot copies
1215+
// every column in the requested window into freshly allocated
1216+
// MatrixRows so the caller can freely mutate the result. With
1217+
// numRoutes×numColumns cells the pivot is O(N×C) and its cost is the
1218+
// dominant term in the admin handler's response latency.
1219+
func BenchmarkSnapshot(b *testing.B) {
1220+
const (
1221+
numRoutes = 1024
1222+
numColumns = 64
1223+
)
1224+
clk := &fakeClock{now: time.Unix(1_700_000_000, 0)}
1225+
s := NewMemSampler(MemSamplerOptions{
1226+
Step: time.Second,
1227+
HistoryColumns: numColumns,
1228+
MaxTrackedRoutes: numRoutes,
1229+
Now: clk.Now,
1230+
})
1231+
for r := uint64(1); r <= numRoutes; r++ {
1232+
if !s.RegisterRoute(r, []byte{byte(r >> 8), byte(r)}, []byte{byte((r + 1) >> 8), byte(r + 1)}) {
1233+
b.Fatalf("RegisterRoute(%d) returned false", r)
1234+
}
1235+
}
1236+
for c := 0; c < numColumns; c++ {
1237+
for r := uint64(1); r <= numRoutes; r++ {
1238+
s.Observe(r, OpWrite, 16, 64)
1239+
}
1240+
s.Flush()
1241+
clk.Advance(time.Second)
1242+
}
1243+
b.ReportAllocs()
1244+
b.ResetTimer()
1245+
for i := 0; i < b.N; i++ {
1246+
_ = s.Snapshot(time.Time{}, time.Time{})
1247+
}
1248+
}
1249+
1250+
// TestObserveExactCountUnderConcurrentBurst pins the Phase 2-A
1251+
// "no counts lost" invariant under the kind of workload §5.2 calls
1252+
// out as the SLO target: many goroutines hammering many routes
1253+
// simultaneously. With sub-sampling not yet implemented (sampleRate
1254+
// = 1 always), every Observe must be reflected exactly in the
1255+
// post-Flush Snapshot. When sub-sampling lands and this invariant
1256+
// no longer holds, this test must be updated alongside the new
1257+
// estimator-based ±5% / 95%-CI assertion described in §5.2.
1258+
func TestObserveExactCountUnderConcurrentBurst(t *testing.T) {
1259+
t.Parallel()
1260+
const (
1261+
numRoutes = 32
1262+
writersPerRoute = 8
1263+
opsPerWriter = 4_000
1264+
keyLen = 16
1265+
valueLen = 64
1266+
)
1267+
s, clk := newTestSampler(t, MemSamplerOptions{
1268+
Step: time.Second,
1269+
HistoryColumns: 8,
1270+
MaxTrackedRoutes: numRoutes,
1271+
})
1272+
for r := uint64(1); r <= numRoutes; r++ {
1273+
if !s.RegisterRoute(r, []byte{byte(r)}, []byte{byte(r) + 1}) {
1274+
t.Fatalf("RegisterRoute(%d) returned false", r)
1275+
}
1276+
}
1277+
1278+
runConcurrentBurst(s, numRoutes, writersPerRoute, opsPerWriter, keyLen, valueLen)
1279+
clk.Advance(time.Second)
1280+
s.Flush()
1281+
1282+
cols := s.Snapshot(time.Time{}, time.Time{})
1283+
if len(cols) != 1 {
1284+
t.Fatalf("expected 1 column after a single Flush, got %d", len(cols))
1285+
}
1286+
const expectedPerRoute = uint64(writersPerRoute * opsPerWriter)
1287+
const expectedBytesPerRoute = expectedPerRoute * uint64(keyLen+valueLen)
1288+
1289+
// Codex P1 on PR #682: assert every registered route appears in
1290+
// the column. Without this index, a future Flush regression that
1291+
// silently drops a route's row would still pass the per-row
1292+
// counter checks below — the loop would just iterate fewer times.
1293+
rowsByRoute := make(map[uint64]MatrixRow, numRoutes)
1294+
for _, row := range cols[0].Rows {
1295+
rowsByRoute[row.RouteID] = row
1296+
}
1297+
for r := uint64(1); r <= numRoutes; r++ {
1298+
row, ok := rowsByRoute[r]
1299+
if !ok {
1300+
t.Errorf("route %d: missing from Snapshot rows; Flush must not silently drop a registered route under burst", r)
1301+
continue
1302+
}
1303+
if row.Writes != expectedPerRoute {
1304+
t.Errorf("route %d: writes = %d, want exactly %d (no counts must be lost under concurrent burst)",
1305+
row.RouteID, row.Writes, expectedPerRoute)
1306+
}
1307+
if row.WriteBytes != expectedBytesPerRoute {
1308+
t.Errorf("route %d: writeBytes = %d, want exactly %d",
1309+
row.RouteID, row.WriteBytes, expectedBytesPerRoute)
1310+
}
1311+
}
1312+
}
1313+
1314+
// runConcurrentBurst spawns numRoutes×writersPerRoute goroutines and
1315+
// releases them simultaneously so every route sees genuinely
1316+
// concurrent Observe traffic. Returns once every writer has finished.
1317+
//
1318+
// numRoutes is uint64 so the loop iterates in the same type the
1319+
// sampler uses for RouteID; the earlier `int` form needed a
1320+
// per-iteration `uint64(r)` cast that triggered gosec G115 and a
1321+
// nolint suppression CLAUDE.md tells us to refactor instead of
1322+
// suppress (Claude bot round-2 P1 on PR #682).
1323+
func runConcurrentBurst(s *MemSampler, numRoutes uint64, writersPerRoute, opsPerWriter, keyLen, valueLen int) {
1324+
var ready, start, done sync.WaitGroup
1325+
total := int(numRoutes) * writersPerRoute
1326+
ready.Add(total)
1327+
done.Add(total)
1328+
start.Add(1)
1329+
for r := uint64(1); r <= numRoutes; r++ {
1330+
routeID := r
1331+
for w := 0; w < writersPerRoute; w++ {
1332+
go func() {
1333+
defer done.Done()
1334+
ready.Done()
1335+
start.Wait()
1336+
for op := 0; op < opsPerWriter; op++ {
1337+
s.Observe(routeID, OpWrite, keyLen, valueLen)
1338+
}
1339+
}()
1340+
}
1341+
}
1342+
ready.Wait()
1343+
start.Done()
1344+
done.Wait()
1345+
}

0 commit comments

Comments
 (0)