@@ -24,7 +24,8 @@ use crate::file_groups::FileGroup;
2424use crate :: {
2525 PartitionedFile , display:: FileGroupsDisplay , file:: FileSource ,
2626 file_compression_type:: FileCompressionType , file_stream:: FileStreamBuilder ,
27- source:: DataSource , statistics:: MinMaxStatistics ,
27+ file_stream:: work_source:: SharedWorkSource , source:: DataSource ,
28+ statistics:: MinMaxStatistics ,
2829} ;
2930use arrow:: datatypes:: FieldRef ;
3031use arrow:: datatypes:: { DataType , Schema , SchemaRef } ;
@@ -38,6 +39,7 @@ use datafusion_execution::{
3839} ;
3940use datafusion_expr:: Operator ;
4041
42+ use crate :: source:: OpenArgs ;
4143use datafusion_physical_expr:: expressions:: { BinaryExpr , Column } ;
4244use datafusion_physical_expr:: projection:: ProjectionExprs ;
4345use datafusion_physical_expr:: utils:: reassign_expr_columns;
@@ -55,6 +57,7 @@ use datafusion_physical_plan::{
5557 metrics:: ExecutionPlanMetricsSet ,
5658} ;
5759use log:: { debug, warn} ;
60+ use std:: any:: Any ;
5861use std:: { fmt:: Debug , fmt:: Formatter , fmt:: Result as FmtResult , sync:: Arc } ;
5962
6063/// [`FileScanConfig`] represents scanning data from a group of files
@@ -578,6 +581,15 @@ impl DataSource for FileScanConfig {
578581 partition : usize ,
579582 context : Arc < TaskContext > ,
580583 ) -> Result < SendableRecordBatchStream > {
584+ self . open_with_args ( OpenArgs :: new ( partition, context) )
585+ }
586+
587+ fn open_with_args ( & self , args : OpenArgs ) -> Result < SendableRecordBatchStream > {
588+ let OpenArgs {
589+ partition,
590+ context,
591+ sibling_state,
592+ } = args;
581593 let object_store = context. runtime_env ( ) . object_store ( & self . object_store_url ) ?;
582594 let batch_size = self
583595 . batch_size
@@ -587,8 +599,17 @@ impl DataSource for FileScanConfig {
587599
588600 let morselizer = source. create_morselizer ( object_store, self , partition) ?;
589601
602+ // Extract the shared work source from the sibling state if it exists.
603+ // This allows multiple sibling streams to steal work from a single
604+ // shared queue of unopened files.
605+ let shared_work_source = sibling_state
606+ . as_ref ( )
607+ . and_then ( |state| state. downcast_ref :: < SharedWorkSource > ( ) )
608+ . cloned ( ) ;
609+
590610 let stream = FileStreamBuilder :: new ( self )
591611 . with_partition ( partition)
612+ . with_shared_work_source ( shared_work_source)
592613 . with_morselizer ( morselizer)
593614 . with_metrics ( source. metrics ( ) )
594615 . build ( ) ?;
@@ -985,6 +1006,20 @@ impl DataSource for FileScanConfig {
9851006 // Delegate to the file source
9861007 self . file_source . apply_expressions ( f)
9871008 }
1009+
1010+ /// Create any shared state that should be passed between sibling streams
1011+ /// during one execution.
1012+ ///
1013+ /// This returns `None` when sibling streams must not share work, such as
1014+ /// when file order must be preserved or the file groups define the output
1015+ /// partitioning needed for the rest of the plan
1016+ fn create_sibling_state ( & self ) -> Option < Arc < dyn Any + Send + Sync > > {
1017+ if self . preserve_order || self . partitioned_by_file_group {
1018+ return None ;
1019+ }
1020+
1021+ Some ( Arc :: new ( SharedWorkSource :: from_config ( self ) ) as Arc < dyn Any + Send + Sync > )
1022+ }
9881023}
9891024
9901025impl FileScanConfig {
@@ -1362,19 +1397,33 @@ mod tests {
13621397
13631398 use super :: * ;
13641399 use crate :: TableSchema ;
1400+ use crate :: source:: DataSourceExec ;
13651401 use crate :: test_util:: col;
13661402 use crate :: {
13671403 generate_test_files, test_util:: MockSource , tests:: aggr_test_schema,
13681404 verify_sort_integrity,
13691405 } ;
13701406
1407+ use arrow:: array:: { Int32Array , RecordBatch } ;
13711408 use arrow:: datatypes:: Field ;
13721409 use datafusion_common:: ColumnStatistics ;
13731410 use datafusion_common:: stats:: Precision ;
1411+ use datafusion_common:: tree_node:: TreeNodeRecursion ;
1412+ use datafusion_common:: { Result , assert_batches_eq, internal_err} ;
1413+ use datafusion_execution:: TaskContext ;
13741414 use datafusion_expr:: SortExpr ;
1415+ use datafusion_physical_expr:: PhysicalExpr ;
13751416 use datafusion_physical_expr:: create_physical_sort_expr;
13761417 use datafusion_physical_expr:: expressions:: Literal ;
13771418 use datafusion_physical_expr:: projection:: ProjectionExpr ;
1419+ use datafusion_physical_expr:: projection:: ProjectionExprs ;
1420+ use datafusion_physical_plan:: ExecutionPlan ;
1421+ use datafusion_physical_plan:: execution_plan:: collect;
1422+ use futures:: FutureExt as _;
1423+ use futures:: StreamExt as _;
1424+ use futures:: stream;
1425+ use object_store:: ObjectStore ;
1426+ use std:: fmt:: Debug ;
13781427
13791428 #[ derive( Clone ) ]
13801429 struct InexactSortPushdownSource {
@@ -1394,7 +1443,7 @@ mod tests {
13941443 impl FileSource for InexactSortPushdownSource {
13951444 fn create_file_opener (
13961445 & self ,
1397- _object_store : Arc < dyn object_store :: ObjectStore > ,
1446+ _object_store : Arc < dyn ObjectStore > ,
13981447 _base_config : & FileScanConfig ,
13991448 _partition : usize ,
14001449 ) -> Result < Arc < dyn crate :: file_stream:: FileOpener > > {
@@ -2278,6 +2327,88 @@ mod tests {
22782327 assert_eq ! ( partition_stats. total_byte_size, Precision :: Exact ( 800 ) ) ;
22792328 }
22802329
2330+ /// Regression test for reusing a `DataSourceExec` after its execution-local
2331+ /// shared work queue has been drained.
2332+ ///
2333+ /// This test uses a single file group with two files so the scan creates a
2334+ /// shared unopened-file queue. Executing after `reset_state` must recreate
2335+ /// the shared queue and return the same rows again.
2336+ #[ tokio:: test]
2337+ async fn reset_state_recreates_shared_work_source ( ) -> Result < ( ) > {
2338+ let schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new(
2339+ "value" ,
2340+ DataType :: Int32 ,
2341+ false ,
2342+ ) ] ) ) ;
2343+ let file_source = Arc :: new (
2344+ MockSource :: new ( Arc :: clone ( & schema) )
2345+ . with_file_opener ( Arc :: new ( ResetStateTestFileOpener { schema } ) ) ,
2346+ ) ;
2347+
2348+ let config =
2349+ FileScanConfigBuilder :: new ( ObjectStoreUrl :: local_filesystem ( ) , file_source)
2350+ . with_file_group ( FileGroup :: new ( vec ! [
2351+ PartitionedFile :: new( "file1.parquet" , 100 ) ,
2352+ PartitionedFile :: new( "file2.parquet" , 100 ) ,
2353+ ] ) )
2354+ . build ( ) ;
2355+
2356+ let exec: Arc < dyn ExecutionPlan > = DataSourceExec :: from_data_source ( config) ;
2357+ let task_ctx = Arc :: new ( TaskContext :: default ( ) ) ;
2358+
2359+ // Running the same scan after resetting the state, should
2360+ // produce the same answer.
2361+ let first_run = collect ( Arc :: clone ( & exec) , Arc :: clone ( & task_ctx) ) . await ?;
2362+ let reset_exec = exec. reset_state ( ) ?;
2363+ let second_run = collect ( reset_exec, task_ctx) . await ?;
2364+
2365+ let expected = [
2366+ "+-------+" ,
2367+ "| value |" ,
2368+ "+-------+" ,
2369+ "| 1 |" ,
2370+ "| 2 |" ,
2371+ "+-------+" ,
2372+ ] ;
2373+ assert_batches_eq ! ( expected, & first_run) ;
2374+ assert_batches_eq ! ( expected, & second_run) ;
2375+
2376+ Ok ( ( ) )
2377+ }
2378+
2379+ /// Test-only `FileOpener` that turns file names like `file1.parquet` into a
2380+ /// single-batch stream containing that numeric value
2381+ #[ derive( Debug ) ]
2382+ struct ResetStateTestFileOpener {
2383+ schema : SchemaRef ,
2384+ }
2385+
2386+ impl crate :: file_stream:: FileOpener for ResetStateTestFileOpener {
2387+ fn open (
2388+ & self ,
2389+ file : PartitionedFile ,
2390+ ) -> Result < crate :: file_stream:: FileOpenFuture > {
2391+ let value = file
2392+ . object_meta
2393+ . location
2394+ . as_ref ( )
2395+ . trim_start_matches ( "file" )
2396+ . trim_end_matches ( ".parquet" )
2397+ . parse :: < i32 > ( )
2398+ . expect ( "invalid test file name" ) ;
2399+ let schema = Arc :: clone ( & self . schema ) ;
2400+ Ok ( async move {
2401+ let batch = RecordBatch :: try_new (
2402+ schema,
2403+ vec ! [ Arc :: new( Int32Array :: from( vec![ value] ) ) ] ,
2404+ )
2405+ . expect ( "test batch should be valid" ) ;
2406+ Ok ( stream:: iter ( vec ! [ Ok ( batch) ] ) . boxed ( ) )
2407+ }
2408+ . boxed ( ) )
2409+ }
2410+ }
2411+
22812412 #[ test]
22822413 fn test_output_partitioning_not_partitioned_by_file_group ( ) {
22832414 let file_schema = aggr_test_schema ( ) ;
@@ -2461,7 +2592,7 @@ mod tests {
24612592 impl FileSource for ExactSortPushdownSource {
24622593 fn create_file_opener (
24632594 & self ,
2464- _object_store : Arc < dyn object_store :: ObjectStore > ,
2595+ _object_store : Arc < dyn ObjectStore > ,
24652596 _base_config : & FileScanConfig ,
24662597 _partition : usize ,
24672598 ) -> Result < Arc < dyn crate :: file_stream:: FileOpener > > {
0 commit comments