Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
250 changes: 250 additions & 0 deletions keyviz/sampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1093,3 +1093,253 @@ func BenchmarkObserveMiss(b *testing.B) {
s.Observe(99, OpRead, 16, 64)
}
}

// BenchmarkObserveParallel pins the contention profile of the hot
// path. Each goroutine is shard-pinned to a disjoint range of route
// IDs so per-slot atomic adds genuinely never contend across
// goroutines and the only shared work is the atomic.Pointer load of
// the route table. A regression that adds a shared mutex (or a
// global counter) on the hot path will show up as a sharp drop in
// ns/op as parallelism rises.
//
// numRoutes is sized for numRoutes / routesPerShard >= 64
// disjoint shards so the benchmark stays meaningful up to
// GOMAXPROCS = 64. The previous draft (numRoutes = 64,
// routesPerShard = 4) only had 16 shards and silently regressed
// to shared-counter contention on bigger CI runners — the very
// regression class this benchmark exists to detect (Claude bot
// round-2 P2 on PR #682).
func BenchmarkObserveParallel(b *testing.B) {
const (
numRoutes = 256
routesPerShard = 4
)
s := NewMemSampler(MemSamplerOptions{Step: time.Second, HistoryColumns: 4, MaxTrackedRoutes: numRoutes})
for r := uint64(1); r <= numRoutes; r++ {
if !s.RegisterRoute(r, []byte{byte(r >> 8), byte(r)}, []byte{byte((r + 1) >> 8), byte(r + 1)}) {
b.Fatalf("RegisterRoute(%d) returned false", r)
}
}
var nextShard atomic.Uint64
b.ReportAllocs()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
shardIndex := nextShard.Add(1) - 1
shardBase := (shardIndex * routesPerShard) % numRoutes
Comment on lines +1127 to +1128
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Size benchmark shards to parallel worker count

BenchmarkObserveParallel still reintroduces cross-goroutine counter contention when RunParallel launches more than 16 workers (its default is GOMAXPROCS): with numRoutes=64 and routesPerShard=4, shardBase := (shardIndex * routesPerShard) % numRoutes wraps and assigns multiple workers to the same 4-route shard. In that environment, the benchmark no longer isolates the shared table.Load cost it claims to pin, so ns/op becomes host-core-count dependent and can mask/blur hot-path regressions.

Useful? React with 👍 / 👎.

var i uint64
for pb.Next() {
s.Observe(shardBase+(i%routesPerShard)+1, OpWrite, 16, 64)
i++
}
})
Comment on lines +1126 to +1134
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The implementation of BenchmarkObserveParallel does not align with its description. The comment states that each goroutine hits a distinct slot to avoid contention, but since i is a local variable within the RunParallel closure, every goroutine starts at i=0 and iterates through the same sequence of slots. This results in high contention on the atomic counters of the first few slots, which likely masks the performance of the atomic.Pointer load and map lookup you intended to measure. To fix this, you can use an atomic counter to give each goroutine a unique starting offset. This aligns with the preference for using atomic operations to manage hot paths and reduce contention.

	var gid atomic.Uint64
	b.RunParallel(func(pb *testing.PB) {
		offset := gid.Add(1)
		var i uint64
		for pb.Next() {
			s.Observe(((offset+i)%numRoutes)+1, OpWrite, 16, 64)
			i++
		}
	})
References
  1. For frequently accessed fields that require monotonic updates, prefer atomic operations over mutexes to improve performance on hot paths.

}

// BenchmarkRegisterRoute pins the route-mutation path: each call
// takes routesMu, deep-copies the immutable routeTable, mutates, and
// republishes via atomic.Store. The benchmark holds a fixed-size
// table (registerBenchTableSize routes pre-loaded) and toggles a
// route in/out on each iteration so b.N controls the iteration count
// only — not the table size. The earlier draft made b.N drive both
// the iteration count and the MaxTrackedRoutes cap, which produced an
// O(N²) total cost and made ns/op (which Go derives by dividing total
// time by b.N) read as growing-with-N even though per-call cost was
// constant (Gemini medium on PR #682).
func BenchmarkRegisterRoute(b *testing.B) {
const registerBenchTableSize = 1024
s := NewMemSampler(MemSamplerOptions{
Step: time.Second,
HistoryColumns: 4,
MaxTrackedRoutes: registerBenchTableSize + 1,
})
for r := uint64(1); r <= registerBenchTableSize; r++ {
if !s.RegisterRoute(r, []byte{byte(r >> 8), byte(r)}, []byte{byte((r + 1) >> 8), byte(r + 1)}) {
b.Fatalf("seed RegisterRoute(%d) returned false", r)
}
}
churnID := uint64(registerBenchTableSize + 1)
startKey := []byte{byte(churnID >> 8), byte(churnID & 0xFF)}
endKey := []byte{byte((churnID + 1) >> 8), byte((churnID + 1) & 0xFF)}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
s.RemoveRoute(churnID)
if !s.RegisterRoute(churnID, startKey, endKey) {
b.Fatalf("RegisterRoute(%d) returned false at i=%d", churnID, i)
}
}
}
Comment on lines +1147 to +1170
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This benchmark has $O(N^2)$ complexity because it uses b.N both for the number of iterations and as the scale of the data structure (via MaxTrackedRoutes and the number of registered routes). As b.N increases, each RegisterRoute call becomes more expensive due to the COW operation on an increasingly large table. This prevents the benchmark from reaching a stable ns/op value and makes the results dependent on the total execution time. It is better to benchmark the registration cost at specific, fixed table sizes (e.g., using sub-benchmarks) to ensure reproducible and meaningful results.


// BenchmarkFlush pins the per-step drain cost. Flush walks every
// live slot, atomic.SwapUint64s its four counters, and pushes a new
// MatrixColumn into the ring buffer. The hot path Observe must not
// race with this drain (atomic-only for both sides), but Flush itself
// scales with the live route count — pin its cost so we notice if
// a future change adds e.g. an O(N²) slot scan.
//
// b.StopTimer brackets the per-iteration reseed loop so the reported
// ns/op reflects the Flush cost only. Including the 1024 Observe
// calls in the timed range inflated the number by ~25% and let a
// real Flush regression hide behind Observe variance (Gemini medium
// on PR #682).
func BenchmarkFlush(b *testing.B) {
const numRoutes = 1024
clk := &fakeClock{now: time.Unix(1_700_000_000, 0)}
s := NewMemSampler(MemSamplerOptions{
Step: time.Second,
HistoryColumns: 16,
MaxTrackedRoutes: numRoutes,
Now: clk.Now,
})
for r := uint64(1); r <= numRoutes; r++ {
if !s.RegisterRoute(r, []byte{byte(r >> 8), byte(r)}, []byte{byte((r + 1) >> 8), byte(r + 1)}) {
b.Fatalf("RegisterRoute(%d) returned false", r)
}
// Pre-seed every slot with traffic so the first Flush has work to swap.
s.Observe(r, OpWrite, 16, 64)
}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
s.Flush()
b.StopTimer()
clk.Advance(time.Second)
// Reseed so the next Flush still has counters to drain.
for r := uint64(1); r <= numRoutes; r++ {
s.Observe(r, OpWrite, 16, 64)
}
b.StartTimer()
}
Comment on lines +1202 to +1211
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The Observe loop used to reseed the sampler is included in the timed portion of the benchmark. While Observe is fast, calling it 1024 times per iteration adds significant overhead (roughly 20-25% based on your reported numbers) that obscures the actual cost of Flush. Consider using b.StopTimer() and b.StartTimer() around the reseeding loop to isolate the measurement of the Flush operation.

	for i := 0; i < b.N; i++ {
		s.Flush()
		clk.Advance(time.Second)
		b.StopTimer()
		// Reseed so the next Flush still has counters to drain.
		for r := uint64(1); r <= numRoutes; r++ {
			s.Observe(r, OpWrite, 16, 64)
		}
		b.StartTimer()
	}

}

// BenchmarkSnapshot pins the read-side pivot cost. Snapshot copies
// every column in the requested window into freshly allocated
// MatrixRows so the caller can freely mutate the result. With
// numRoutes×numColumns cells the pivot is O(N×C) and its cost is the
// dominant term in the admin handler's response latency.
func BenchmarkSnapshot(b *testing.B) {
const (
numRoutes = 1024
numColumns = 64
)
clk := &fakeClock{now: time.Unix(1_700_000_000, 0)}
s := NewMemSampler(MemSamplerOptions{
Step: time.Second,
HistoryColumns: numColumns,
MaxTrackedRoutes: numRoutes,
Now: clk.Now,
})
for r := uint64(1); r <= numRoutes; r++ {
if !s.RegisterRoute(r, []byte{byte(r >> 8), byte(r)}, []byte{byte((r + 1) >> 8), byte(r + 1)}) {
b.Fatalf("RegisterRoute(%d) returned false", r)
}
}
for c := 0; c < numColumns; c++ {
for r := uint64(1); r <= numRoutes; r++ {
s.Observe(r, OpWrite, 16, 64)
}
s.Flush()
clk.Advance(time.Second)
}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = s.Snapshot(time.Time{}, time.Time{})
}
}

// TestObserveExactCountUnderConcurrentBurst pins the Phase 2-A
// "no counts lost" invariant under the kind of workload §5.2 calls
// out as the SLO target: many goroutines hammering many routes
// simultaneously. With sub-sampling not yet implemented (sampleRate
// = 1 always), every Observe must be reflected exactly in the
// post-Flush Snapshot. When sub-sampling lands and this invariant
// no longer holds, this test must be updated alongside the new
// estimator-based ±5% / 95%-CI assertion described in §5.2.
func TestObserveExactCountUnderConcurrentBurst(t *testing.T) {
t.Parallel()
const (
numRoutes = 32
writersPerRoute = 8
opsPerWriter = 4_000
keyLen = 16
valueLen = 64
)
s, clk := newTestSampler(t, MemSamplerOptions{
Step: time.Second,
HistoryColumns: 8,
MaxTrackedRoutes: numRoutes,
})
for r := uint64(1); r <= numRoutes; r++ {
if !s.RegisterRoute(r, []byte{byte(r)}, []byte{byte(r) + 1}) {
t.Fatalf("RegisterRoute(%d) returned false", r)
}
}

runConcurrentBurst(s, numRoutes, writersPerRoute, opsPerWriter, keyLen, valueLen)
clk.Advance(time.Second)
s.Flush()

cols := s.Snapshot(time.Time{}, time.Time{})
if len(cols) != 1 {
t.Fatalf("expected 1 column after a single Flush, got %d", len(cols))
}
const expectedPerRoute = uint64(writersPerRoute * opsPerWriter)
const expectedBytesPerRoute = expectedPerRoute * uint64(keyLen+valueLen)

// Codex P1 on PR #682: assert every registered route appears in
// the column. Without this index, a future Flush regression that
// silently drops a route's row would still pass the per-row
// counter checks below — the loop would just iterate fewer times.
rowsByRoute := make(map[uint64]MatrixRow, numRoutes)
for _, row := range cols[0].Rows {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Assert all routes are present in burst-count test

This test only validates rows that appear in cols[0].Rows, but it never checks that there is one row per registered route. Because Flush omits zero-value rows, a regression that drops all writes for one or more routes would simply omit those rows and still pass this test, weakening the stated “no counts lost” guarantee under concurrent burst.

Useful? React with 👍 / 👎.

rowsByRoute[row.RouteID] = row
}
for r := uint64(1); r <= numRoutes; r++ {
row, ok := rowsByRoute[r]
if !ok {
t.Errorf("route %d: missing from Snapshot rows; Flush must not silently drop a registered route under burst", r)
continue
}
if row.Writes != expectedPerRoute {
t.Errorf("route %d: writes = %d, want exactly %d (no counts must be lost under concurrent burst)",
row.RouteID, row.Writes, expectedPerRoute)
}
if row.WriteBytes != expectedBytesPerRoute {
t.Errorf("route %d: writeBytes = %d, want exactly %d",
row.RouteID, row.WriteBytes, expectedBytesPerRoute)
}
}
}

// runConcurrentBurst spawns numRoutes×writersPerRoute goroutines and
// releases them simultaneously so every route sees genuinely
// concurrent Observe traffic. Returns once every writer has finished.
//
// numRoutes is uint64 so the loop iterates in the same type the
// sampler uses for RouteID; the earlier `int` form needed a
// per-iteration `uint64(r)` cast that triggered gosec G115 and a
// nolint suppression CLAUDE.md tells us to refactor instead of
// suppress (Claude bot round-2 P1 on PR #682).
func runConcurrentBurst(s *MemSampler, numRoutes uint64, writersPerRoute, opsPerWriter, keyLen, valueLen int) {
var ready, start, done sync.WaitGroup
total := int(numRoutes) * writersPerRoute
ready.Add(total)
done.Add(total)
start.Add(1)
for r := uint64(1); r <= numRoutes; r++ {
routeID := r
for w := 0; w < writersPerRoute; w++ {
go func() {
defer done.Done()
ready.Done()
start.Wait()
for op := 0; op < opsPerWriter; op++ {
s.Observe(routeID, OpWrite, keyLen, valueLen)
}
}()
}
}
ready.Wait()
start.Done()
done.Wait()
}
Loading