@@ -187,7 +187,7 @@ async fn aggregate_string(
187187 ctx : SessionContext ,
188188 limit : usize ,
189189 use_topk : bool ,
190- ) -> Result < ( ) > {
190+ ) -> Result < Vec < RecordBatch > > {
191191 let sql = format ! (
192192 "select max(trace_id) from traces group by timestamp_ms order by max(trace_id) desc limit {limit};"
193193 ) ;
@@ -204,7 +204,7 @@ async fn aggregate_string(
204204 let batch = batches. first ( ) . unwrap ( ) ;
205205 assert_eq ! ( batch. num_rows( ) , LIMIT ) ;
206206
207- Ok ( ( ) )
207+ Ok ( batches )
208208}
209209
210210async fn aggregate_distinct (
@@ -285,157 +285,213 @@ async fn aggregate_distinct(
285285 Ok ( ( ) )
286286}
287287
288- fn criterion_benchmark ( c : & mut Criterion ) {
289- let rt = Runtime :: new ( ) . unwrap ( ) ;
290- let limit = LIMIT ;
291- let partitions = 10 ;
292- let samples = 1_000_000 ;
293-
294- let ctx = rt
295- . block_on ( create_context ( partitions, samples, false , false , false ) )
296- . unwrap ( ) ;
297- c. bench_function (
298- format ! ( "aggregate {} time-series rows" , partitions * samples) . as_str ( ) ,
299- |b| b. iter ( || run ( & rt, ctx. clone ( ) , limit, false , false ) ) ,
300- ) ;
301-
302- let ctx = rt
303- . block_on ( create_context ( partitions, samples, true , false , false ) )
304- . unwrap ( ) ;
305- c. bench_function (
306- format ! ( "aggregate {} worst-case rows" , partitions * samples) . as_str ( ) ,
307- |b| b. iter ( || run ( & rt, ctx. clone ( ) , limit, false , true ) ) ,
308- ) ;
288+ struct BenchCase < ' a > {
289+ name_tpl : & ' a str ,
290+ asc : bool ,
291+ use_topk : bool ,
292+ use_view : bool ,
293+ }
309294
310- let ctx = rt
311- . block_on ( create_context ( partitions, samples, false , true , false ) )
312- . unwrap ( ) ;
313- c. bench_function (
314- format ! (
315- "top k={limit} aggregate {} time-series rows" ,
316- partitions * samples
317- )
318- . as_str ( ) ,
319- |b| b. iter ( || run ( & rt, ctx. clone ( ) , limit, true , false ) ) ,
320- ) ;
295+ struct StringCase {
296+ asc : bool ,
297+ use_topk : bool ,
298+ use_view : bool ,
299+ }
321300
322- let ctx = rt
323- . block_on ( create_context ( partitions, samples, true , true , false ) )
301+ fn assert_utf8_utf8view_match (
302+ rt : & Runtime ,
303+ partitions : i32 ,
304+ samples : i32 ,
305+ limit : usize ,
306+ asc : bool ,
307+ use_topk : bool ,
308+ ) {
309+ let ctx_utf8 = rt
310+ . block_on ( create_context ( partitions, samples, asc, use_topk, false ) )
324311 . unwrap ( ) ;
325- c. bench_function (
326- format ! (
327- "top k={limit} aggregate {} worst-case rows" ,
328- partitions * samples
329- )
330- . as_str ( ) ,
331- |b| b. iter ( || run ( & rt, ctx. clone ( ) , limit, true , true ) ) ,
332- ) ;
333-
334- // Utf8View schema,time-series rows
335- let ctx = rt
336- . block_on ( create_context ( partitions, samples, false , true , true ) )
312+ let ctx_view = rt
313+ . block_on ( create_context ( partitions, samples, asc, use_topk, true ) )
337314 . unwrap ( ) ;
338- c. bench_function (
339- format ! (
340- "top k={limit} aggregate {} time-series rows [Utf8View]" ,
341- partitions * samples
342- )
343- . as_str ( ) ,
344- |b| b. iter ( || run ( & rt, ctx. clone ( ) , limit, true , false ) ) ,
345- ) ;
346-
347- // Utf8View schema,worst-case rows
348- let ctx = rt
349- . block_on ( create_context ( partitions, samples, true , true , true ) )
315+ let batches_utf8 = rt
316+ . block_on ( aggregate_string ( ctx_utf8, limit, use_topk) )
350317 . unwrap ( ) ;
351- c. bench_function (
352- format ! (
353- "top k={limit} aggregate {} worst-case rows [Utf8View]" ,
354- partitions * samples
355- )
356- . as_str ( ) ,
357- |b| b. iter ( || run ( & rt, ctx. clone ( ) , limit, true , true ) ) ,
358- ) ;
359-
360- // String aggregate benchmarks - grouping by timestamp, aggregating string column
361- let ctx = rt
362- . block_on ( create_context ( partitions, samples, false , true , false ) )
318+ let batches_view = rt
319+ . block_on ( aggregate_string ( ctx_view, limit, use_topk) )
363320 . unwrap ( ) ;
364- c. bench_function (
365- format ! (
366- "top k={limit} string aggregate {} time-series rows [Utf8]" ,
367- partitions * samples
368- )
369- . as_str ( ) ,
370- |b| b. iter ( || run_string ( & rt, ctx. clone ( ) , limit, true ) ) ,
321+ let result_utf8 = pretty_format_batches ( & batches_utf8) . unwrap ( ) . to_string ( ) ;
322+ let result_view = pretty_format_batches ( & batches_view) . unwrap ( ) . to_string ( ) ;
323+ assert_eq ! (
324+ result_utf8, result_view,
325+ "Utf8 vs Utf8View mismatch for asc={asc}, use_topk={use_topk}"
371326 ) ;
327+ }
372328
373- let ctx = rt
374- . block_on ( create_context ( partitions, samples, true , true , false ) )
375- . unwrap ( ) ;
376- c. bench_function (
377- format ! (
378- "top k={limit} string aggregate {} worst-case rows [Utf8]" ,
379- partitions * samples
380- )
381- . as_str ( ) ,
382- |b| b. iter ( || run_string ( & rt, ctx. clone ( ) , limit, true ) ) ,
383- ) ;
329+ fn assert_string_results_match (
330+ rt : & Runtime ,
331+ partitions : i32 ,
332+ samples : i32 ,
333+ limit : usize ,
334+ ) {
335+ for asc in [ false , true ] {
336+ for use_topk in [ false , true ] {
337+ assert_utf8_utf8view_match ( rt, partitions, samples, limit, asc, use_topk) ;
338+ }
339+ }
340+ }
384341
385- let ctx = rt
386- . block_on ( create_context ( partitions, samples, false , true , true ) )
387- . unwrap ( ) ;
388- c. bench_function (
389- format ! (
390- "top k={limit} string aggregate {} time-series rows [Utf8View]" ,
391- partitions * samples
392- )
393- . as_str ( ) ,
394- |b| b. iter ( || run_string ( & rt, ctx. clone ( ) , limit, true ) ) ,
395- ) ;
342+ fn criterion_benchmark ( c : & mut Criterion ) {
343+ let rt = Runtime :: new ( ) . unwrap ( ) ;
344+ let limit = LIMIT ;
345+ let partitions = 10 ;
346+ let samples = 1_000_000 ;
347+ let total_rows = partitions * samples;
348+
349+ // Numeric aggregate benchmarks
350+ let numeric_cases = & [
351+ BenchCase {
352+ name_tpl : "aggregate {rows} time-series rows" ,
353+ asc : false ,
354+ use_topk : false ,
355+ use_view : false ,
356+ } ,
357+ BenchCase {
358+ name_tpl : "aggregate {rows} worst-case rows" ,
359+ asc : true ,
360+ use_topk : false ,
361+ use_view : false ,
362+ } ,
363+ BenchCase {
364+ name_tpl : "top k={limit} aggregate {rows} time-series rows" ,
365+ asc : false ,
366+ use_topk : true ,
367+ use_view : false ,
368+ } ,
369+ BenchCase {
370+ name_tpl : "top k={limit} aggregate {rows} worst-case rows" ,
371+ asc : true ,
372+ use_topk : true ,
373+ use_view : false ,
374+ } ,
375+ BenchCase {
376+ name_tpl : "top k={limit} aggregate {rows} time-series rows [Utf8View]" ,
377+ asc : false ,
378+ use_topk : true ,
379+ use_view : true ,
380+ } ,
381+ BenchCase {
382+ name_tpl : "top k={limit} aggregate {rows} worst-case rows [Utf8View]" ,
383+ asc : true ,
384+ use_topk : true ,
385+ use_view : true ,
386+ } ,
387+ ] ;
388+ for case in numeric_cases {
389+ let name = case
390+ . name_tpl
391+ . replace ( "{rows}" , & total_rows. to_string ( ) )
392+ . replace ( "{limit}" , & limit. to_string ( ) ) ;
393+ let ctx = rt
394+ . block_on ( create_context (
395+ partitions,
396+ samples,
397+ case. asc ,
398+ case. use_topk ,
399+ case. use_view ,
400+ ) )
401+ . unwrap ( ) ;
402+ c. bench_function ( & name, |b| {
403+ b. iter ( || run ( & rt, ctx. clone ( ) , limit, case. use_topk , case. asc ) )
404+ } ) ;
405+ }
396406
397- let ctx = rt
398- . block_on ( create_context ( partitions, samples, true , true , true ) )
399- . unwrap ( ) ;
400- c. bench_function (
401- format ! (
402- "top k={limit} string aggregate {} worst-case rows [Utf8View]" ,
403- partitions * samples
404- )
405- . as_str ( ) ,
406- |b| b. iter ( || run_string ( & rt, ctx. clone ( ) , limit, true ) ) ,
407- ) ;
407+ assert_string_results_match ( & rt, partitions, samples, limit) ;
408+
409+ let string_cases = & [
410+ StringCase {
411+ asc : false ,
412+ use_topk : false ,
413+ use_view : false ,
414+ } ,
415+ StringCase {
416+ asc : true ,
417+ use_topk : false ,
418+ use_view : false ,
419+ } ,
420+ StringCase {
421+ asc : false ,
422+ use_topk : false ,
423+ use_view : true ,
424+ } ,
425+ StringCase {
426+ asc : true ,
427+ use_topk : false ,
428+ use_view : true ,
429+ } ,
430+ StringCase {
431+ asc : false ,
432+ use_topk : true ,
433+ use_view : false ,
434+ } ,
435+ StringCase {
436+ asc : true ,
437+ use_topk : true ,
438+ use_view : false ,
439+ } ,
440+ StringCase {
441+ asc : false ,
442+ use_topk : true ,
443+ use_view : true ,
444+ } ,
445+ StringCase {
446+ asc : true ,
447+ use_topk : true ,
448+ use_view : true ,
449+ } ,
450+ ] ;
451+ for case in string_cases {
452+ let scenario = if case. asc {
453+ "worst-case"
454+ } else {
455+ "time-series"
456+ } ;
457+ let type_label = if case. use_view { "Utf8View" } else { "Utf8" } ;
458+ let name = if case. use_topk {
459+ format ! (
460+ "top k={limit} string aggregate {total_rows} {scenario} rows [{type_label}]"
461+ )
462+ } else {
463+ format ! ( "string aggregate {total_rows} {scenario} rows [{type_label}]" )
464+ } ;
465+ let ctx = rt
466+ . block_on ( create_context (
467+ partitions,
468+ samples,
469+ case. asc ,
470+ case. use_topk ,
471+ case. use_view ,
472+ ) )
473+ . unwrap ( ) ;
474+ c. bench_function ( & name, |b| {
475+ b. iter ( || run_string ( & rt, ctx. clone ( ) , limit, case. use_topk ) )
476+ } ) ;
477+ }
408478
409479 // DISTINCT benchmarks
410- let ctx = rt. block_on ( async {
411- create_context_distinct ( partitions, samples, false )
412- . await
413- . unwrap ( )
414- } ) ;
415- c. bench_function (
416- format ! ( "distinct {} rows desc [no TopK]" , partitions * samples) . as_str ( ) ,
417- |b| b. iter ( || run_distinct ( & rt, ctx. clone ( ) , limit, false , false ) ) ,
418- ) ;
419-
420- c. bench_function (
421- format ! ( "distinct {} rows asc [no TopK]" , partitions * samples) . as_str ( ) ,
422- |b| b. iter ( || run_distinct ( & rt, ctx. clone ( ) , limit, false , true ) ) ,
423- ) ;
424-
425- let ctx_topk = rt. block_on ( async {
426- create_context_distinct ( partitions, samples, true )
427- . await
428- . unwrap ( )
429- } ) ;
430- c. bench_function (
431- format ! ( "distinct {} rows desc [TopK]" , partitions * samples) . as_str ( ) ,
432- |b| b. iter ( || run_distinct ( & rt, ctx_topk. clone ( ) , limit, true , false ) ) ,
433- ) ;
434-
435- c. bench_function (
436- format ! ( "distinct {} rows asc [TopK]" , partitions * samples) . as_str ( ) ,
437- |b| b. iter ( || run_distinct ( & rt, ctx_topk. clone ( ) , limit, true , true ) ) ,
438- ) ;
480+ for use_topk in [ false , true ] {
481+ let ctx = rt. block_on ( async {
482+ create_context_distinct ( partitions, samples, use_topk)
483+ . await
484+ . unwrap ( )
485+ } ) ;
486+ let topk_label = if use_topk { "TopK" } else { "no TopK" } ;
487+ for asc in [ false , true ] {
488+ let dir = if asc { "asc" } else { "desc" } ;
489+ let name = format ! ( "distinct {total_rows} rows {dir} [{topk_label}]" ) ;
490+ c. bench_function ( & name, |b| {
491+ b. iter ( || run_distinct ( & rt, ctx. clone ( ) , limit, use_topk, asc) )
492+ } ) ;
493+ }
494+ }
439495}
440496
441497criterion_group ! ( benches, criterion_benchmark) ;
0 commit comments