@@ -56,13 +56,15 @@ use crate::sort::reverse_row_selection;
5656use datafusion_common:: config:: EncryptionFactoryOptions ;
5757#[ cfg( feature = "parquet_encryption" ) ]
5858use datafusion_execution:: parquet_encryption:: EncryptionFactory ;
59- use futures:: { Stream , StreamExt , TryStreamExt , ready} ;
59+ use futures:: { Stream , StreamExt , ready} ;
6060use log:: debug;
61+ use parquet:: DecodeResult ;
6162use parquet:: arrow:: arrow_reader:: metrics:: ArrowReaderMetrics ;
6263use parquet:: arrow:: arrow_reader:: {
6364 ArrowReaderMetadata , ArrowReaderOptions , RowSelectionPolicy ,
6465} ;
6566use parquet:: arrow:: async_reader:: AsyncFileReader ;
67+ use parquet:: arrow:: push_decoder:: ParquetPushDecoderBuilder ;
6668use parquet:: arrow:: { ParquetRecordBatchStreamBuilder , ProjectionMask } ;
6769use parquet:: file:: metadata:: { PageIndexPolicy , ParquetMetaDataReader , RowGroupMetaData } ;
6870
@@ -167,17 +169,6 @@ impl PreparedAccessPlan {
167169
168170 Ok ( self )
169171 }
170-
171- /// Apply this access plan to a ParquetRecordBatchStreamBuilder
172- fn apply_to_builder (
173- self ,
174- mut builder : ParquetRecordBatchStreamBuilder < Box < dyn AsyncFileReader > > ,
175- ) -> ParquetRecordBatchStreamBuilder < Box < dyn AsyncFileReader > > {
176- if let Some ( row_selection) = self . row_selection {
177- builder = builder. with_row_selection ( row_selection) ;
178- }
179- builder. with_row_groups ( self . row_group_indexes )
180- }
181172}
182173
183174impl FileOpener for ParquetOpener {
@@ -267,6 +258,9 @@ impl FileOpener for ParquetOpener {
267258 let enable_bloom_filter = self . enable_bloom_filter ;
268259 let enable_row_group_stats_pruning = self . enable_row_group_stats_pruning ;
269260 let limit = self . limit ;
261+ let parquet_file_reader_factory = Arc :: clone ( & self . parquet_file_reader_factory ) ;
262+ let partition_index = self . partition_index ;
263+ let metrics = self . metrics . clone ( ) ;
270264
271265 let predicate_creation_errors = MetricBuilder :: new ( & self . metrics )
272266 . global_counter ( "num_predicate_creation_errors" ) ;
@@ -444,57 +438,14 @@ impl FileOpener for ParquetOpener {
444438
445439 metadata_timer. stop ( ) ;
446440
447- // ---------------------------------------------------------
448- // Step: construct builder for the final RecordBatch stream
449- // ---------------------------------------------------------
450-
451- let mut builder = ParquetRecordBatchStreamBuilder :: new_with_metadata (
452- async_file_reader,
453- reader_metadata,
454- ) ;
455-
456- // ---------------------------------------------------------------------
457- // Step: optionally add row filter to the builder
458- //
459- // Row filter is used for late materialization in parquet decoding, see
460- // `row_filter` for details.
461- // ---------------------------------------------------------------------
462-
463- // Filter pushdown: evaluate predicates during scan
464- if let Some ( predicate) = pushdown_filters. then_some ( predicate) . flatten ( ) {
465- let row_filter = row_filter:: build_row_filter (
466- & predicate,
467- & physical_file_schema,
468- builder. metadata ( ) ,
469- reorder_predicates,
470- & file_metrics,
471- ) ;
472-
473- match row_filter {
474- Ok ( Some ( filter) ) => {
475- builder = builder. with_row_filter ( filter) ;
476- }
477- Ok ( None ) => { }
478- Err ( e) => {
479- debug ! (
480- "Ignoring error building row filter for '{predicate:?}': {e}"
481- ) ;
482- }
483- } ;
484- } ;
485- if force_filter_selections {
486- builder =
487- builder. with_row_selection_policy ( RowSelectionPolicy :: Selectors ) ;
488- }
489-
490441 // ------------------------------------------------------------
491442 // Step: prune row groups by range, predicate and bloom filter
492443 // ------------------------------------------------------------
493444
494445 // Determine which row groups to actually read. The idea is to skip
495446 // as many row groups as possible based on the metadata and query
496- let file_metadata = Arc :: clone ( builder . metadata ( ) ) ;
497- let predicate = pruning_predicate. as_ref ( ) . map ( |p| p. as_ref ( ) ) ;
447+ let file_metadata = Arc :: clone ( reader_metadata . metadata ( ) ) ;
448+ let pruning_pred = pruning_predicate. as_ref ( ) . map ( |p| p. as_ref ( ) ) ;
498449 let rg_metadata = file_metadata. row_groups ( ) ;
499450 // track which row groups to actually read
500451 let access_plan =
@@ -506,13 +457,13 @@ impl FileOpener for ParquetOpener {
506457 }
507458
508459 // If there is a predicate that can be evaluated against the metadata
509- if let Some ( predicate ) = predicate . as_ref ( ) {
460+ if let Some ( pruning_pred ) = pruning_pred . as_ref ( ) {
510461 if enable_row_group_stats_pruning {
511462 row_groups. prune_by_statistics (
512463 & physical_file_schema,
513- builder . parquet_schema ( ) ,
464+ reader_metadata . parquet_schema ( ) ,
514465 rg_metadata,
515- predicate ,
466+ pruning_pred ,
516467 & file_metrics,
517468 ) ;
518469 } else {
@@ -524,11 +475,27 @@ impl FileOpener for ParquetOpener {
524475 }
525476
526477 if enable_bloom_filter && !row_groups. is_empty ( ) {
478+ // Use the existing reader for bloom filter I/O;
479+ // replace with a fresh reader for decoding below.
480+ let bf_reader = std:: mem:: replace (
481+ & mut async_file_reader,
482+ parquet_file_reader_factory. create_reader (
483+ partition_index,
484+ partitioned_file. clone ( ) ,
485+ metadata_size_hint,
486+ & metrics,
487+ ) ?,
488+ ) ;
489+ let mut bf_builder =
490+ ParquetRecordBatchStreamBuilder :: new_with_metadata (
491+ bf_reader,
492+ reader_metadata. clone ( ) ,
493+ ) ;
527494 row_groups
528495 . prune_by_bloom_filters (
529496 & physical_file_schema,
530- & mut builder ,
531- predicate ,
497+ & mut bf_builder ,
498+ pruning_pred ,
532499 & file_metrics,
533500 )
534501 . await ;
@@ -570,7 +537,7 @@ impl FileOpener for ParquetOpener {
570537 access_plan = p. prune_plan_with_page_index (
571538 access_plan,
572539 & physical_file_schema,
573- builder . parquet_schema ( ) ,
540+ reader_metadata . parquet_schema ( ) ,
574541 file_metadata. as_ref ( ) ,
575542 & file_metrics,
576543 ) ;
@@ -588,8 +555,59 @@ impl FileOpener for ParquetOpener {
588555 prepared_plan = prepared_plan. reverse ( file_metadata. as_ref ( ) ) ?;
589556 }
590557
558+ if prepared_plan. row_group_indexes . is_empty ( ) {
559+ return Ok ( futures:: stream:: empty ( ) . boxed ( ) ) ;
560+ }
561+
562+ // ---------------------------------------------------------
563+ // Step: construct builder for the final RecordBatch stream
564+ // ---------------------------------------------------------
565+
566+ let mut builder =
567+ ParquetPushDecoderBuilder :: new_with_metadata ( reader_metadata. clone ( ) )
568+ . with_batch_size ( batch_size) ;
569+
570+ // ---------------------------------------------------------------------
571+ // Step: optionally add row filter to the builder
572+ //
573+ // Row filter is used for late materialization in parquet decoding, see
574+ // `row_filter` for details.
575+ // ---------------------------------------------------------------------
576+
577+ // Filter pushdown: evaluate predicates during scan
578+ if let Some ( predicate) =
579+ pushdown_filters. then_some ( predicate. as_ref ( ) ) . flatten ( )
580+ {
581+ let row_filter = row_filter:: build_row_filter (
582+ predicate,
583+ & physical_file_schema,
584+ file_metadata. as_ref ( ) ,
585+ reorder_predicates,
586+ & file_metrics,
587+ ) ;
588+
589+ match row_filter {
590+ Ok ( Some ( filter) ) => {
591+ builder = builder. with_row_filter ( filter) ;
592+ }
593+ Ok ( None ) => { }
594+ Err ( e) => {
595+ debug ! (
596+ "Ignoring error building row filter for '{predicate:?}': {e}"
597+ ) ;
598+ }
599+ } ;
600+ } ;
601+ if force_filter_selections {
602+ builder =
603+ builder. with_row_selection_policy ( RowSelectionPolicy :: Selectors ) ;
604+ }
605+
591606 // Apply the prepared plan to the builder
592- builder = prepared_plan. apply_to_builder ( builder) ;
607+ if let Some ( row_selection) = prepared_plan. row_selection {
608+ builder = builder. with_row_selection ( row_selection) ;
609+ }
610+ builder = builder. with_row_groups ( prepared_plan. row_group_indexes ) ;
593611
594612 if let Some ( limit) = limit {
595613 builder = builder. with_limit ( limit)
@@ -603,11 +621,11 @@ impl FileOpener for ParquetOpener {
603621 let arrow_reader_metrics = ArrowReaderMetrics :: enabled ( ) ;
604622
605623 let indices = projection. column_indices ( ) ;
606- let mask = ProjectionMask :: roots ( builder. parquet_schema ( ) , indices) ;
624+ let mask =
625+ ProjectionMask :: roots ( reader_metadata. parquet_schema ( ) , indices. clone ( ) ) ;
607626
608- let stream = builder
627+ let decoder = builder
609628 . with_projection ( mask)
610- . with_batch_size ( batch_size)
611629 . with_metrics ( arrow_reader_metrics. clone ( ) )
612630 . build ( ) ?;
613631
@@ -617,57 +635,76 @@ impl FileOpener for ParquetOpener {
617635 file_metrics. predicate_cache_inner_records . clone ( ) ;
618636 let predicate_cache_records = file_metrics. predicate_cache_records . clone ( ) ;
619637
620- let stream_schema = Arc :: clone ( stream. schema ( ) ) ;
621- // Check if we need to replace the schema to handle things like differing nullability or metadata.
622- // See note below about file vs. output schema.
623- let replace_schema = !stream_schema. eq ( & output_schema) ;
624-
625638 // Rebase column indices to match the narrowed stream schema.
626639 // The projection expressions have indices based on physical_file_schema,
627640 // but the stream only contains the columns selected by the ProjectionMask.
641+ let stream_schema = Arc :: new ( physical_file_schema. project ( & indices) ?) ;
642+ let replace_schema = stream_schema != output_schema;
628643 let projection = projection
629644 . try_map_exprs ( |expr| reassign_expr_columns ( expr, & stream_schema) ) ?;
630-
631645 let projector = projection. make_projector ( & stream_schema) ?;
632-
633- let stream = stream. map_err ( DataFusionError :: from) . map ( move |b| {
634- b. and_then ( |mut b| {
635- copy_arrow_reader_metrics (
636- & arrow_reader_metrics,
637- & predicate_cache_inner_records,
638- & predicate_cache_records,
639- ) ;
640- b = projector. project_batch ( & b) ?;
641- if replace_schema {
642- // Ensure the output batch has the expected schema.
643- // This handles things like schema level and field level metadata, which may not be present
644- // in the physical file schema.
645- // It is also possible for nullability to differ; some writers create files with
646- // OPTIONAL fields even when there are no nulls in the data.
647- // In these cases it may make sense for the logical schema to be `NOT NULL`.
648- // RecordBatch::try_new_with_options checks that if the schema is NOT NULL
649- // the array cannot contain nulls, amongst other checks.
650- let ( _stream_schema, arrays, num_rows) = b. into_parts ( ) ;
651- let options =
652- RecordBatchOptions :: new ( ) . with_row_count ( Some ( num_rows) ) ;
653- RecordBatch :: try_new_with_options (
654- Arc :: clone ( & output_schema) ,
655- arrays,
656- & options,
657- )
658- . map_err ( Into :: into)
659- } else {
660- Ok ( b)
646+ let stream = futures:: stream:: unfold (
647+ PushDecoderStreamState {
648+ decoder,
649+ reader : async_file_reader,
650+ projector,
651+ output_schema,
652+ replace_schema,
653+ arrow_reader_metrics,
654+ predicate_cache_inner_records,
655+ predicate_cache_records,
656+ } ,
657+ |mut state| async move {
658+ loop {
659+ match state. decoder . try_decode ( ) {
660+ Ok ( DecodeResult :: NeedsData ( ranges) ) => {
661+ match state. reader . get_byte_ranges ( ranges. clone ( ) ) . await {
662+ Ok ( data) => {
663+ if let Err ( e) =
664+ state. decoder . push_ranges ( ranges, data)
665+ {
666+ return Some ( (
667+ Err ( DataFusionError :: from ( e) ) ,
668+ state,
669+ ) ) ;
670+ }
671+ }
672+ Err ( e) => {
673+ return Some ( (
674+ Err ( DataFusionError :: from ( e) ) ,
675+ state,
676+ ) ) ;
677+ }
678+ }
679+ }
680+ Ok ( DecodeResult :: Data ( batch) ) => {
681+ copy_arrow_reader_metrics (
682+ & state. arrow_reader_metrics ,
683+ & state. predicate_cache_inner_records ,
684+ & state. predicate_cache_records ,
685+ ) ;
686+ let result = state. project_batch ( & batch) ;
687+ return Some ( ( result, state) ) ;
688+ }
689+ Ok ( DecodeResult :: Finished ) => {
690+ return None ;
691+ }
692+ Err ( e) => {
693+ return Some ( ( Err ( DataFusionError :: from ( e) ) , state) ) ;
694+ }
695+ }
661696 }
662- } )
663- } ) ;
697+ } ,
698+ )
699+ . fuse ( ) ;
664700
665701 // ----------------------------------------------------------------------
666702 // Step: wrap the stream so a dynamic filter can stop the file scan early
667703 // ----------------------------------------------------------------------
668704 if let Some ( file_pruner) = file_pruner {
705+ let boxed_stream = stream. boxed ( ) ;
669706 Ok ( EarlyStoppingStream :: new (
670- stream ,
707+ boxed_stream ,
671708 file_pruner,
672709 files_ranges_pruned_statistics,
673710 )
@@ -679,6 +716,33 @@ impl FileOpener for ParquetOpener {
679716 }
680717}
681718
719+ struct PushDecoderStreamState {
720+ decoder : parquet:: arrow:: push_decoder:: ParquetPushDecoder ,
721+ reader : Box < dyn AsyncFileReader > ,
722+ projector : datafusion_physical_expr:: projection:: Projector ,
723+ output_schema : Arc < arrow:: datatypes:: Schema > ,
724+ replace_schema : bool ,
725+ arrow_reader_metrics : ArrowReaderMetrics ,
726+ predicate_cache_inner_records : Gauge ,
727+ predicate_cache_records : Gauge ,
728+ }
729+
730+ impl PushDecoderStreamState {
731+ fn project_batch ( & self , batch : & RecordBatch ) -> Result < RecordBatch > {
732+ let mut batch = self . projector . project_batch ( batch) ?;
733+ if self . replace_schema {
734+ let ( _schema, arrays, num_rows) = batch. into_parts ( ) ;
735+ let options = RecordBatchOptions :: new ( ) . with_row_count ( Some ( num_rows) ) ;
736+ batch = RecordBatch :: try_new_with_options (
737+ Arc :: clone ( & self . output_schema ) ,
738+ arrays,
739+ & options,
740+ ) ?;
741+ }
742+ Ok ( batch)
743+ }
744+ }
745+
682746/// Copies metrics from ArrowReaderMetrics (the metrics collected by the
683747/// arrow-rs parquet reader) to the parquet file metrics for DataFusion
684748fn copy_arrow_reader_metrics (
0 commit comments