1919
2020#include < atomic>
2121#include < chrono>
22- #include < cstring>
23- #include < iostream>
2422#include < memory>
2523#include < thread>
2624#include < vector>
2725
28- #include " ../../common/benchmark.hpp"
29-
3026namespace asio = boost::asio;
3127using tcp = asio::ip::tcp;
3228using asio_bench::tcp_acceptor;
@@ -78,8 +74,8 @@ struct sequential_churn_op
7874 tcp_acceptor& acc;
7975 tcp::endpoint ep;
8076 std::atomic<bool >& running;
81- int64_t & cycles;
8277 perf::statistics& latency_stats;
78+ std::atomic<int64_t >& ops;
8379 std::unique_ptr<tcp_socket> client;
8480 std::unique_ptr<tcp_socket> server;
8581 perf::stopwatch sw;
@@ -154,73 +150,53 @@ struct sequential_churn_op
154150 client->close ();
155151 server->close ();
156152
157- latency_stats.add (sw.elapsed_us ());
158- ++cycles ;
153+ latency_stats.add (sw.elapsed_ns ());
154+ ops. fetch_add ( 1 , std::memory_order_relaxed) ;
159155 start ();
160156 }
161157};
162158
163159// Single connect/accept/1-byte-exchange/close loop. Compared against the
164160// coroutine variant, the difference isolates coroutine suspend/resume overhead.
165- bench::benchmark_result
166- bench_sequential_churn (double duration_s )
161+ void
162+ bench_sequential_churn (bench::state& state )
167163{
168- perf::print_header (" Sequential Accept Churn (Asio Callbacks)" );
169-
170164 asio::io_context ioc;
171165 auto acc = make_churn_acceptor ( ioc );
172166 auto ep = tcp::endpoint ( asio::ip::address_v4::loopback (), acc.local_endpoint ().port () );
173167
174168 std::atomic<bool > running{true };
175- int64_t cycles = 0 ;
176- perf::statistics latency_stats;
177169
178- sequential_churn_op op{ioc, acc, ep, running, cycles ,
179- latency_stats , {}, {}, {}};
170+ sequential_churn_op op{ioc, acc, ep, running, state. latency () ,
171+ state. ops () , {}, {}, {}};
180172
181173 perf::stopwatch total_sw;
182174
183175 op.start ();
184176
185177 std::thread timer ([&]() {
186- std::this_thread::sleep_for (std::chrono::duration<double >(duration_s ));
178+ std::this_thread::sleep_for (std::chrono::duration<double >(state. duration () ));
187179 running.store (false , std::memory_order_relaxed);
188180 ioc.stop ();
189181 });
190182
191183 ioc.run ();
192184 timer.join ();
193185
194- double elapsed = total_sw.elapsed_seconds ();
195- double conns_per_sec = static_cast <double >(cycles) / elapsed;
196-
197- std::cout << " Cycles: " << cycles << " \n " ;
198- std::cout << " Elapsed: " << std::fixed << std::setprecision (3 )
199- << elapsed << " s\n " ;
200- std::cout << " Throughput: " << perf::format_rate (conns_per_sec) << " \n " ;
201- perf::print_latency_stats (latency_stats, " Cycle latency" );
202- std::cout << " \n " ;
203-
186+ state.set_elapsed (total_sw.elapsed_seconds ());
204187 acc.close ();
205-
206- return bench::benchmark_result (" sequential" )
207- .add (" cycles" , static_cast <double >(cycles))
208- .add (" elapsed_s" , elapsed)
209- .add (" conns_per_sec" , conns_per_sec)
210- .add_latency_stats (" cycle_latency" , latency_stats);
211188}
212189
213190// N independent accept loops on separate listeners. Reveals whether
214191// fd allocation or acceptor state scales linearly under callbacks.
215- bench::benchmark_result
216- bench_concurrent_churn (int num_loops, double duration_s )
192+ void
193+ bench_concurrent_churn (bench::state& state )
217194{
218- std::cout << " Concurrent loops: " << num_loops << " \n " ;
195+ int num_loops = static_cast <int >(state.range (0 ));
196+ state.counters [" num_loops" ] = num_loops;
219197
220198 asio::io_context ioc;
221199 std::atomic<bool > running{true };
222- std::vector<int64_t > cycle_counts (num_loops, 0 );
223- std::vector<perf::statistics> stats (num_loops);
224200
225201 std::vector<tcp_acceptor> acceptors;
226202 acceptors.reserve ( num_loops );
@@ -238,53 +214,22 @@ bench_concurrent_churn(int num_loops, double duration_s)
238214 asio::ip::address_v4::loopback (), acceptors[i].local_endpoint ().port () );
239215 ops.push_back ( std::make_unique<sequential_churn_op>(
240216 sequential_churn_op{ ioc, acceptors[i], ep, running,
241- cycle_counts[i], stats[i] , {}, {}, {} } ) );
217+ state. latency (), state. ops () , {}, {}, {} } ) );
242218 ops.back ()->start ();
243219 }
244220
245221 std::thread stopper ([&]() {
246- std::this_thread::sleep_for (std::chrono::duration<double >(duration_s ));
222+ std::this_thread::sleep_for (std::chrono::duration<double >(state. duration () ));
247223 running.store (false , std::memory_order_relaxed);
248224 ioc.stop ();
249225 });
250226
251227 ioc.run ();
252228 stopper.join ();
253229
254- double elapsed = total_sw.elapsed_seconds ();
255-
256- int64_t total_cycles = 0 ;
257- for (auto c : cycle_counts)
258- total_cycles += c;
259-
260- double conns_per_sec = static_cast <double >(total_cycles) / elapsed;
261-
262- double total_mean = 0 ;
263- double total_p99 = 0 ;
264- for (auto & s : stats)
265- {
266- total_mean += s.mean ();
267- total_p99 += s.p99 ();
268- }
269-
270- std::cout << " Total cycles: " << total_cycles << " \n " ;
271- std::cout << " Elapsed: " << std::fixed << std::setprecision (3 )
272- << elapsed << " s\n " ;
273- std::cout << " Throughput: " << perf::format_rate (conns_per_sec) << " \n " ;
274- std::cout << " Avg mean latency: "
275- << perf::format_latency (total_mean / num_loops) << " \n " ;
276- std::cout << " Avg p99 latency: "
277- << perf::format_latency (total_p99 / num_loops) << " \n\n " ;
278-
230+ state.set_elapsed (total_sw.elapsed_seconds ());
279231 for ( auto & a : acceptors )
280232 a.close ();
281-
282- return bench::benchmark_result (" concurrent_" + std::to_string (num_loops))
283- .add (" num_loops" , num_loops)
284- .add (" total_cycles" , static_cast <double >(total_cycles))
285- .add (" conns_per_sec" , conns_per_sec)
286- .add (" avg_mean_latency_us" , total_mean / num_loops)
287- .add (" avg_p99_latency_us" , total_p99 / num_loops);
288233}
289234
290235// Burst: open N connections, accept all, close all, repeat
@@ -294,8 +239,8 @@ struct burst_churn_op
294239 tcp_acceptor& acc;
295240 tcp::endpoint ep;
296241 std::atomic<bool >& running;
297- int64_t & total_accepted;
298242 perf::statistics& burst_stats;
243+ std::atomic<int64_t >& ops;
299244 int burst_size;
300245
301246 std::vector<std::unique_ptr<tcp_socket>> clients;
@@ -344,7 +289,6 @@ struct burst_churn_op
344289 if (ec)
345290 return ;
346291 ++accepted_count;
347- ++total_accepted;
348292 if (accepted_count == burst_size)
349293 close_all ();
350294 });
@@ -358,89 +302,60 @@ struct burst_churn_op
358302 for (auto & s : servers)
359303 s->close ();
360304
361- burst_stats.add (sw.elapsed_us ());
305+ burst_stats.add (sw.elapsed_ns ());
306+ ops.fetch_add (1 , std::memory_order_relaxed);
362307 start ();
363308 }
364309};
365310
366- // Burst N connects then accept all — stresses the listen backlog and
311+ // Burst N connects then accept all -- stresses the listen backlog and
367312// batched fd creation. Reveals whether the acceptor handles connection
368313// storms gracefully or suffers from backlog overflow.
369- bench::benchmark_result
370- bench_burst_churn (int burst_size, double duration_s )
314+ void
315+ bench_burst_churn (bench::state& state )
371316{
372- std::cout << " Burst size: " << burst_size << " \n " ;
317+ int burst_size = static_cast <int >(state.range (0 ));
318+ state.counters [" burst_size" ] = burst_size;
373319
374320 asio::io_context ioc;
375321 auto acc = make_churn_acceptor ( ioc );
376322 auto ep = tcp::endpoint ( asio::ip::address_v4::loopback (), acc.local_endpoint ().port () );
377323
378324 std::atomic<bool > running{true };
379- int64_t total_accepted = 0 ;
380- perf::statistics burst_stats;
381325
382- burst_churn_op op{ioc, acc, ep, running, total_accepted ,
383- burst_stats , burst_size, {}, {}, {},
326+ burst_churn_op op{ioc, acc, ep, running, state. latency () ,
327+ state. ops () , burst_size, {}, {}, {},
384328 {}};
385329
386330 perf::stopwatch total_sw;
387331
388332 op.start ();
389333
390334 std::thread stopper ([&]() {
391- std::this_thread::sleep_for (std::chrono::duration<double >(duration_s ));
335+ std::this_thread::sleep_for (std::chrono::duration<double >(state. duration () ));
392336 running.store (false , std::memory_order_relaxed);
393337 ioc.stop ();
394338 });
395339
396340 ioc.run ();
397341 stopper.join ();
398342
399- double elapsed = total_sw.elapsed_seconds ();
400- double accepts_per_sec = static_cast <double >(total_accepted) / elapsed;
401-
402- std::cout << " Total accepted: " << total_accepted << " \n " ;
403- std::cout << " Elapsed: " << std::fixed << std::setprecision (3 )
404- << elapsed << " s\n " ;
405- std::cout << " Accept rate: " << perf::format_rate (accepts_per_sec)
406- << " \n " ;
407- perf::print_latency_stats (burst_stats, " Burst latency" );
408- std::cout << " \n " ;
409-
343+ state.set_elapsed (total_sw.elapsed_seconds ());
410344 acc.close ();
411-
412- return bench::benchmark_result (" burst_" + std::to_string (burst_size))
413- .add (" burst_size" , burst_size)
414- .add (" total_accepted" , static_cast <double >(total_accepted))
415- .add (" accepts_per_sec" , accepts_per_sec)
416- .add_latency_stats (" burst_latency" , burst_stats);
417345}
418346
419347} // anonymous namespace
420348
421- void
422- run_accept_churn_benchmarks (
423- bench::result_collector& collector, char const * filter, double duration_s)
349+ bench::benchmark_suite
350+ make_accept_churn_suite ()
424351{
425- bool run_all = !filter || std::strcmp (filter, " all" ) == 0 ;
426-
427- if (run_all || std::strcmp (filter, " sequential" ) == 0 )
428- collector.add (bench_sequential_churn (duration_s));
429-
430- if (run_all || std::strcmp (filter, " concurrent" ) == 0 )
431- {
432- perf::print_header (" Concurrent Accept Churn (Asio Callbacks)" );
433- collector.add (bench_concurrent_churn (1 , duration_s));
434- collector.add (bench_concurrent_churn (4 , duration_s));
435- collector.add (bench_concurrent_churn (16 , duration_s));
436- }
437-
438- if (run_all || std::strcmp (filter, " burst" ) == 0 )
439- {
440- perf::print_header (" Burst Accept Churn (Asio Callbacks)" );
441- collector.add (bench_burst_churn (10 , duration_s));
442- collector.add (bench_burst_churn (100 , duration_s));
443- }
352+ using F = bench::bench_flags;
353+ return bench::benchmark_suite (" accept_churn" , F::needs_conntrack_drain)
354+ .add (" sequential" , bench_sequential_churn)
355+ .add (" concurrent" , bench_concurrent_churn)
356+ .args ({1 , 4 , 16 })
357+ .add (" burst" , bench_burst_churn)
358+ .args ({10 , 100 });
444359}
445360
446361} // namespace asio_callback_bench
0 commit comments