@@ -38,6 +38,7 @@ use bytes::Bytes;
3838use datafusion_datasource:: file_scan_config:: FileScanConfigBuilder ;
3939use datafusion_datasource:: source:: DataSourceExec ;
4040use datafusion_datasource_parquet:: metadata:: DFParquetMetadata ;
41+ use datafusion_datasource_parquet:: { ParquetAccessPlan , RowGroupAccess } ;
4142use futures:: future:: BoxFuture ;
4243use futures:: { FutureExt , TryFutureExt } ;
4344use insta:: assert_snapshot;
@@ -49,6 +50,7 @@ use parquet::arrow::arrow_reader::ArrowReaderOptions;
4950use parquet:: arrow:: async_reader:: AsyncFileReader ;
5051use parquet:: errors:: ParquetError ;
5152use parquet:: file:: metadata:: ParquetMetaData ;
53+ use parquet:: file:: properties:: WriterProperties ;
5254
5355const EXPECTED_USER_DEFINED_METADATA : & str = "some-user-defined-metadata" ;
5456
@@ -107,6 +109,82 @@ async fn route_data_access_ops_to_parquet_file_reader_factory() {
107109 " ) ;
108110}
109111
112+ /// Regression test for the type-keyed extensions map: independent components
113+ /// must be able to attach their own per-file payloads on the same
114+ /// [`PartitionedFile`] without colliding. Here we attach a custom reader
115+ /// payload (the `String` checked by [`InMemoryParquetFileReaderFactory`])
116+ /// *and* a [`ParquetAccessPlan`] that skips the first row group, then verify
117+ /// (a) the factory still sees its payload (its internal `assert_eq!` would
118+ /// fire if the slot got overwritten) and (b) the access plan is honored — so
119+ /// only the second row group's 5 rows come out, not all 10.
120+ #[ tokio:: test]
121+ async fn custom_payload_and_access_plan_coexist ( ) {
122+ // Two row groups of 5 rows each: values 0..=4 in row group 0, 5..=9 in
123+ // row group 1.
124+ let c1: ArrayRef = Arc :: new ( Int64Array :: from ( ( 0 ..10 ) . collect :: < Vec < i64 > > ( ) ) ) ;
125+ let batch = create_batch ( vec ! [ ( "c1" , c1) ] ) ;
126+ let file_schema = batch. schema ( ) . clone ( ) ;
127+
128+ let in_memory = InMemory :: new ( ) ;
129+ let mut buf = Vec :: < u8 > :: with_capacity ( 32 * 1024 ) ;
130+ let props = WriterProperties :: builder ( )
131+ . set_max_row_group_row_count ( Some ( 5 ) )
132+ . build ( ) ;
133+ let mut writer = ArrowWriter :: try_new ( & mut buf, batch. schema ( ) , Some ( props) ) . unwrap ( ) ;
134+ writer. write ( & batch) . unwrap ( ) ;
135+ writer. close ( ) . unwrap ( ) ;
136+
137+ let location = Path :: parse ( "two-row-groups.parquet" ) . unwrap ( ) ;
138+ let size = buf. len ( ) as u64 ;
139+ in_memory
140+ . put ( & location, Bytes :: from ( buf) . into ( ) )
141+ . await
142+ . unwrap ( ) ;
143+ let meta = ObjectMeta {
144+ location,
145+ last_modified : chrono:: DateTime :: from ( SystemTime :: now ( ) ) ,
146+ size,
147+ e_tag : None ,
148+ version : None ,
149+ } ;
150+
151+ let access_plan =
152+ ParquetAccessPlan :: new ( vec ! [ RowGroupAccess :: Skip , RowGroupAccess :: Scan ] ) ;
153+ let pf = PartitionedFile :: new_from_meta ( meta)
154+ . with_extension ( String :: from ( EXPECTED_USER_DEFINED_METADATA ) )
155+ . with_extension ( access_plan) ;
156+
157+ let store: Arc < dyn ObjectStore > = Arc :: new ( in_memory) ;
158+ let source = Arc :: new (
159+ ParquetSource :: new ( file_schema. clone ( ) ) . with_parquet_file_reader_factory (
160+ Arc :: new ( InMemoryParquetFileReaderFactory ( Arc :: clone ( & store) ) ) ,
161+ ) ,
162+ ) ;
163+ let base_config =
164+ FileScanConfigBuilder :: new ( ObjectStoreUrl :: local_filesystem ( ) , source)
165+ . with_file_group ( vec ! [ pf] . into ( ) )
166+ . build ( ) ;
167+ let parquet_exec = DataSourceExec :: from_data_source ( base_config) ;
168+
169+ let session_ctx = SessionContext :: new ( ) ;
170+ let read = collect ( parquet_exec, session_ctx. task_ctx ( ) ) . await . unwrap ( ) ;
171+
172+ let total: usize = read. iter ( ) . map ( |b| b. num_rows ( ) ) . sum ( ) ;
173+ assert_eq ! (
174+ total, 5 ,
175+ "access plan should have skipped the first row group"
176+ ) ;
177+
178+ let values: Vec < i64 > = read
179+ . iter ( )
180+ . flat_map ( |b| {
181+ let arr = b. column ( 0 ) . as_any ( ) . downcast_ref :: < Int64Array > ( ) . unwrap ( ) ;
182+ ( 0 ..arr. len ( ) ) . map ( |i| arr. value ( i) ) . collect :: < Vec < _ > > ( )
183+ } )
184+ . collect ( ) ;
185+ assert_eq ! ( values, vec![ 5 , 6 , 7 , 8 , 9 ] ) ;
186+ }
187+
110188#[ derive( Debug ) ]
111189struct InMemoryParquetFileReaderFactory ( Arc < dyn ObjectStore > ) ;
112190
0 commit comments