@@ -107,14 +107,6 @@ struct Args {
107107 #[ arg( long, default_value_t = false , conflicts_with = "explain" ) ]
108108 print_results : bool ,
109109
110- /// Regenerate `.slt.no` reference files from actual query output.
111- #[ arg(
112- long,
113- default_value_t = false ,
114- conflicts_with_all = [ "explain" , "validate" ]
115- ) ]
116- regenerate_slt : bool ,
117-
118110 #[ arg( long, value_delimiter = ',' , value_parser = value_parser!( Format ) ) ]
119111 formats : Vec < Format > ,
120112
@@ -179,71 +171,108 @@ async fn main() -> anyhow::Result<()> {
179171 Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ;
180172 let show_metrics = args. show_metrics ;
181173
182- let mode = if args. explain {
183- BenchmarkMode :: Explain
184- } else if args. regenerate_slt {
185- BenchmarkMode :: RegenerateSlt
186- } else if args. validate {
187- BenchmarkMode :: Run {
188- iterations : 1 ,
189- validate : true ,
190- print_results : false ,
174+ let validate = args. validate || std:: env:: var ( "CI" ) . is_ok ( ) ;
175+ let iterations = if args. validate { 1 } else { args. iterations } ;
176+
177+ if let Some ( slt_path) = benchmark. slt_path ( "datafusion" ) {
178+ for & format in & args. formats {
179+ let session = Arc :: new ( datafusion_bench:: get_session_context ( ) ) ;
180+ datafusion_bench:: make_object_store ( & session, benchmark. data_url ( ) ) ?;
181+ register_benchmark_tables ( & session, & * benchmark, format) . await ?;
182+
183+ runner
184+ . run_slt_async (
185+ & slt_path,
186+ "datafusion" ,
187+ format,
188+ iterations,
189+ validate,
190+ args. queries . as_ref ( ) ,
191+ args. exclude_queries . as_ref ( ) ,
192+ |sql| {
193+ let session = Arc :: clone ( & session) ;
194+ let sql = sql. to_string ( ) ;
195+ async move {
196+ session. sql ( & sql) . await ?. collect ( ) . await ?;
197+ Ok ( ( ) )
198+ }
199+ } ,
200+ |query| {
201+ let session = Arc :: clone ( & session) ;
202+ let plans = Arc :: clone ( & collected_plans) ;
203+ Box :: pin ( async move {
204+ let timer = Instant :: now ( ) ;
205+ let ( batches, plan) = execute_query ( & session, query) . await ?;
206+ let time = timer. elapsed ( ) ;
207+
208+ if show_metrics {
209+ let mut plans_mut = plans. lock ( ) ;
210+ plans_mut. push ( ( 0 , format, plan. clone ( ) ) ) ;
211+ }
212+
213+ anyhow:: Ok ( ( Some ( time) , DataFusionQueryResult ( batches) ) )
214+ } )
215+ } ,
216+ )
217+ . await ?;
191218 }
192219 } else {
193- BenchmarkMode :: Run {
194- iterations : args. iterations ,
195- validate : std:: env:: var ( "CI" ) . is_ok ( ) ,
196- print_results : args. print_results ,
197- }
198- } ;
199-
200- runner
201- . run_all_async (
202- & filtered_queries,
203- mode,
204- |format| {
205- let benchmark = & * benchmark;
206- async move {
207- let session = datafusion_bench:: get_session_context ( ) ;
208- datafusion_bench:: make_object_store ( & session, benchmark. data_url ( ) ) ?;
209- register_benchmark_tables ( & session, benchmark, format) . await ?;
210- Ok ( ( session, format) )
211- }
212- } ,
213- |query_idx, ( session, format) , query| {
214- let plans = Arc :: clone ( & collected_plans) ;
215-
216- let labelset = set_labels ( benchmark_name. clone ( ) , query_idx, * format) ;
220+ let mode = if args. explain {
221+ BenchmarkMode :: Explain
222+ } else {
223+ BenchmarkMode :: Run {
224+ iterations,
225+ validate,
226+ print_results : args. print_results ,
227+ }
228+ } ;
217229
218- Box :: pin (
230+ runner
231+ . run_all_async (
232+ & filtered_queries,
233+ mode,
234+ |format| {
235+ let benchmark = & * benchmark;
219236 async move {
220- let timer = Instant :: now ( ) ;
221- let ( batches, plan) = execute_query ( session, query)
222- . with_labelset ( get_labelset_from_global ( ) )
223- . await ?;
224- let time = timer. elapsed ( ) ;
225-
226- // Store plan for metrics (only store once per query/format combination)
227- if show_metrics {
228- let mut plans_mut = plans. lock ( ) ;
229- // Only store if we don't already have this query/format combo
230- if !plans_mut
231- . iter ( )
232- . any ( |( idx, f, _) | * idx == query_idx && * f == * format)
233- {
234- plans_mut. push ( ( query_idx, * format, plan. clone ( ) ) ) ;
237+ let session = datafusion_bench:: get_session_context ( ) ;
238+ datafusion_bench:: make_object_store ( & session, benchmark. data_url ( ) ) ?;
239+ register_benchmark_tables ( & session, benchmark, format) . await ?;
240+ Ok ( ( session, format) )
241+ }
242+ } ,
243+ |query_idx, ( session, format) , query| {
244+ let plans = Arc :: clone ( & collected_plans) ;
245+
246+ let labelset = set_labels ( benchmark_name. clone ( ) , query_idx, * format) ;
247+
248+ Box :: pin (
249+ async move {
250+ let timer = Instant :: now ( ) ;
251+ let ( batches, plan) = execute_query ( session, query)
252+ . with_labelset ( get_labelset_from_global ( ) )
253+ . await ?;
254+ let time = timer. elapsed ( ) ;
255+
256+ if show_metrics {
257+ let mut plans_mut = plans. lock ( ) ;
258+ if !plans_mut
259+ . iter ( )
260+ . any ( |( idx, f, _) | * idx == query_idx && * f == * format)
261+ {
262+ plans_mut. push ( ( query_idx, * format, plan. clone ( ) ) ) ;
263+ }
235264 }
236- }
237265
238- anyhow:: Ok ( ( Some ( time) , DataFusionQueryResult ( batches) ) )
239- }
240- . with_labelset ( labelset) ,
241- )
242- } ,
243- )
244- . await ?;
266+ anyhow:: Ok ( ( Some ( time) , DataFusionQueryResult ( batches) ) )
267+ }
268+ . with_labelset ( labelset) ,
269+ )
270+ } ,
271+ )
272+ . await ?;
273+ }
245274
246- if !args. explain && !args . validate && !args . regenerate_slt {
275+ if !args. explain {
247276 // Print metrics if requested
248277 if show_metrics {
249278 let plans = collected_plans. lock ( ) ;
0 commit comments