@@ -1093,3 +1093,192 @@ func BenchmarkObserveMiss(b *testing.B) {
10931093 s .Observe (99 , OpRead , 16 , 64 )
10941094 }
10951095}
1096+
1097+ // BenchmarkObserveParallel pins the contention profile of the hot
1098+ // path. Each goroutine hits a distinct slot so the only shared work
1099+ // is the atomic.Pointer load of the route table; per-slot atomic
1100+ // adds do not contend across goroutines. A regression that adds a
1101+ // shared mutex (or a global counter) on the hot path will show up
1102+ // as a sharp drop in ns/op as the parallelism rises.
1103+ func BenchmarkObserveParallel (b * testing.B ) {
1104+ const numRoutes = 64
1105+ s := NewMemSampler (MemSamplerOptions {Step : time .Second , HistoryColumns : 4 , MaxTrackedRoutes : numRoutes })
1106+ for r := uint64 (1 ); r <= numRoutes ; r ++ {
1107+ if ! s .RegisterRoute (r , []byte {byte (r )}, []byte {byte (r ) + 1 }) {
1108+ b .Fatalf ("RegisterRoute(%d) returned false" , r )
1109+ }
1110+ }
1111+ b .ReportAllocs ()
1112+ b .ResetTimer ()
1113+ b .RunParallel (func (pb * testing.PB ) {
1114+ var i uint64
1115+ for pb .Next () {
1116+ i ++
1117+ s .Observe ((i % numRoutes )+ 1 , OpWrite , 16 , 64 )
1118+ }
1119+ })
1120+ }
1121+
1122+ // BenchmarkRegisterRoute pins the route-mutation path: each call
1123+ // takes routesMu, deep-copies the immutable routeTable, mutates, and
1124+ // republishes via atomic.Store. The b.N route IDs grow monotonically
1125+ // so each iteration sees the previous Register's larger table — the
1126+ // shape that exercises the COW cost. A regression that switches to
1127+ // a shared mutable map shows up as a flat ns/op (no growth with N).
1128+ func BenchmarkRegisterRoute (b * testing.B ) {
1129+ s := NewMemSampler (MemSamplerOptions {Step : time .Second , HistoryColumns : 4 , MaxTrackedRoutes : b .N + 1 })
1130+ b .ReportAllocs ()
1131+ b .ResetTimer ()
1132+ for i := 0 ; i < b .N ; i ++ {
1133+ id := uint64 (i + 1 ) //nolint:gosec // i bounded by b.N
1134+ if ! s .RegisterRoute (id , []byte {byte (id )}, []byte {byte (id ) + 1 }) {
1135+ b .Fatalf ("RegisterRoute(%d) returned false at i=%d" , id , i )
1136+ }
1137+ }
1138+ }
1139+
1140+ // BenchmarkFlush pins the per-step drain cost. Flush walks every
1141+ // live slot, atomic.SwapUint64s its four counters, and pushes a new
1142+ // MatrixColumn into the ring buffer. The hot path Observe must not
1143+ // race with this drain (atomic-only for both sides), but Flush itself
1144+ // scales with the live route count — pin its cost so we notice if
1145+ // a future change adds e.g. an O(N²) slot scan.
1146+ func BenchmarkFlush (b * testing.B ) {
1147+ const numRoutes = 1024
1148+ clk := & fakeClock {now : time .Unix (1_700_000_000 , 0 )}
1149+ s := NewMemSampler (MemSamplerOptions {
1150+ Step : time .Second ,
1151+ HistoryColumns : 16 ,
1152+ MaxTrackedRoutes : numRoutes ,
1153+ Now : clk .Now ,
1154+ })
1155+ for r := uint64 (1 ); r <= numRoutes ; r ++ {
1156+ if ! s .RegisterRoute (r , []byte {byte (r >> 8 ), byte (r )}, []byte {byte ((r + 1 ) >> 8 ), byte (r + 1 )}) {
1157+ b .Fatalf ("RegisterRoute(%d) returned false" , r )
1158+ }
1159+ // Pre-seed every slot with traffic so Flush has work to swap.
1160+ s .Observe (r , OpWrite , 16 , 64 )
1161+ }
1162+ b .ReportAllocs ()
1163+ b .ResetTimer ()
1164+ for i := 0 ; i < b .N ; i ++ {
1165+ s .Flush ()
1166+ clk .Advance (time .Second )
1167+ // Reseed so the next Flush still has counters to drain.
1168+ for r := uint64 (1 ); r <= numRoutes ; r ++ {
1169+ s .Observe (r , OpWrite , 16 , 64 )
1170+ }
1171+ }
1172+ }
1173+
1174+ // BenchmarkSnapshot pins the read-side pivot cost. Snapshot copies
1175+ // every column in the requested window into freshly allocated
1176+ // MatrixRows so the caller can freely mutate the result. With
1177+ // numRoutes×numColumns cells the pivot is O(N×C) and its cost is the
1178+ // dominant term in the admin handler's response latency.
1179+ func BenchmarkSnapshot (b * testing.B ) {
1180+ const (
1181+ numRoutes = 1024
1182+ numColumns = 64
1183+ )
1184+ clk := & fakeClock {now : time .Unix (1_700_000_000 , 0 )}
1185+ s := NewMemSampler (MemSamplerOptions {
1186+ Step : time .Second ,
1187+ HistoryColumns : numColumns ,
1188+ MaxTrackedRoutes : numRoutes ,
1189+ Now : clk .Now ,
1190+ })
1191+ for r := uint64 (1 ); r <= numRoutes ; r ++ {
1192+ if ! s .RegisterRoute (r , []byte {byte (r >> 8 ), byte (r )}, []byte {byte ((r + 1 ) >> 8 ), byte (r + 1 )}) {
1193+ b .Fatalf ("RegisterRoute(%d) returned false" , r )
1194+ }
1195+ }
1196+ for c := 0 ; c < numColumns ; c ++ {
1197+ for r := uint64 (1 ); r <= numRoutes ; r ++ {
1198+ s .Observe (r , OpWrite , 16 , 64 )
1199+ }
1200+ s .Flush ()
1201+ clk .Advance (time .Second )
1202+ }
1203+ b .ReportAllocs ()
1204+ b .ResetTimer ()
1205+ for i := 0 ; i < b .N ; i ++ {
1206+ _ = s .Snapshot (time.Time {}, time.Time {})
1207+ }
1208+ }
1209+
1210+ // TestObserveExactCountUnderConcurrentBurst pins the Phase 2-A
1211+ // "no counts lost" invariant under the kind of workload §5.2 calls
1212+ // out as the SLO target: many goroutines hammering many routes
1213+ // simultaneously. With sub-sampling not yet implemented (sampleRate
1214+ // = 1 always), every Observe must be reflected exactly in the
1215+ // post-Flush Snapshot. When sub-sampling lands and this invariant
1216+ // no longer holds, this test must be updated alongside the new
1217+ // estimator-based ±5% / 95%-CI assertion described in §5.2.
1218+ func TestObserveExactCountUnderConcurrentBurst (t * testing.T ) {
1219+ t .Parallel ()
1220+ const (
1221+ numRoutes = 32
1222+ writersPerRoute = 8
1223+ opsPerWriter = 4_000
1224+ keyLen = 16
1225+ valueLen = 64
1226+ )
1227+ s , clk := newTestSampler (t , MemSamplerOptions {
1228+ Step : time .Second ,
1229+ HistoryColumns : 8 ,
1230+ MaxTrackedRoutes : numRoutes ,
1231+ })
1232+ for r := uint64 (1 ); r <= numRoutes ; r ++ {
1233+ if ! s .RegisterRoute (r , []byte {byte (r )}, []byte {byte (r ) + 1 }) {
1234+ t .Fatalf ("RegisterRoute(%d) returned false" , r )
1235+ }
1236+ }
1237+
1238+ runConcurrentBurst (s , numRoutes , writersPerRoute , opsPerWriter , keyLen , valueLen )
1239+ clk .Advance (time .Second )
1240+ s .Flush ()
1241+
1242+ cols := s .Snapshot (time.Time {}, time.Time {})
1243+ if len (cols ) != 1 {
1244+ t .Fatalf ("expected 1 column after a single Flush, got %d" , len (cols ))
1245+ }
1246+ const expectedPerRoute = uint64 (writersPerRoute * opsPerWriter )
1247+ const expectedBytesPerRoute = expectedPerRoute * uint64 (keyLen + valueLen )
1248+ for _ , row := range cols [0 ].Rows {
1249+ if row .Writes != expectedPerRoute {
1250+ t .Errorf ("route %d: writes = %d, want exactly %d (no counts must be lost under concurrent burst)" ,
1251+ row .RouteID , row .Writes , expectedPerRoute )
1252+ }
1253+ if row .WriteBytes != expectedBytesPerRoute {
1254+ t .Errorf ("route %d: writeBytes = %d, want exactly %d" ,
1255+ row .RouteID , row .WriteBytes , expectedBytesPerRoute )
1256+ }
1257+ }
1258+ }
1259+
1260+ // runConcurrentBurst spawns numRoutes×writersPerRoute goroutines and
1261+ // releases them simultaneously so every route sees genuinely
1262+ // concurrent Observe traffic. Returns once every writer has finished.
1263+ func runConcurrentBurst (s * MemSampler , numRoutes , writersPerRoute , opsPerWriter , keyLen , valueLen int ) {
1264+ var ready , start , done sync.WaitGroup
1265+ ready .Add (numRoutes * writersPerRoute )
1266+ done .Add (numRoutes * writersPerRoute )
1267+ start .Add (1 )
1268+ for r := 1 ; r <= numRoutes ; r ++ {
1269+ routeID := uint64 (r ) //nolint:gosec // r bounded by numRoutes
1270+ for w := 0 ; w < writersPerRoute ; w ++ {
1271+ go func () {
1272+ defer done .Done ()
1273+ ready .Done ()
1274+ start .Wait ()
1275+ for op := 0 ; op < opsPerWriter ; op ++ {
1276+ s .Observe (routeID , OpWrite , keyLen , valueLen )
1277+ }
1278+ }()
1279+ }
1280+ }
1281+ ready .Wait ()
1282+ start .Done ()
1283+ done .Wait ()
1284+ }
0 commit comments