Skip to content

Commit 1a23f17

Browse files
committed
Test out pushdecoder
1 parent 79e4bf0 commit 1a23f17

File tree

2 files changed

+46
-47
lines changed

2 files changed

+46
-47
lines changed

datafusion/core/tests/datasource/object_store_access.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,7 @@ async fn query_single_parquet_file_with_single_predicate() {
502502
RequestCountingObjectStore()
503503
Total Requests: 2
504504
- GET (opts) path=parquet_table.parquet head=true
505-
- GET (ranges) path=parquet_table.parquet ranges=1064-1594,1594-2124
505+
- GET (ranges) path=parquet_table.parquet ranges=1064-1481,1481-1594,1594-2011,2011-2124
506506
"
507507
);
508508
}
@@ -526,8 +526,8 @@ async fn query_single_parquet_file_multi_row_groups_multiple_predicates() {
526526
RequestCountingObjectStore()
527527
Total Requests: 3
528528
- GET (opts) path=parquet_table.parquet head=true
529-
- GET (ranges) path=parquet_table.parquet ranges=4-534,534-1064
530-
- GET (ranges) path=parquet_table.parquet ranges=1064-1594,1594-2124
529+
- GET (ranges) path=parquet_table.parquet ranges=4-421,421-534,534-951,951-1064
530+
- GET (ranges) path=parquet_table.parquet ranges=1064-1481,1481-1594,1594-2011,2011-2124
531531
"
532532
);
533533
}

datafusion/datasource-parquet/src/opener.rs

Lines changed: 43 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,7 @@ impl FileOpener for ParquetOpener {
445445
// Determine which row groups to actually read. The idea is to skip
446446
// as many row groups as possible based on the metadata and query
447447
let file_metadata = Arc::clone(reader_metadata.metadata());
448-
let predicate_ref = pruning_predicate.as_ref().map(|p| p.as_ref());
448+
let pruning_pred = pruning_predicate.as_ref().map(|p| p.as_ref());
449449
let rg_metadata = file_metadata.row_groups();
450450
// track which row groups to actually read
451451
let access_plan =
@@ -457,13 +457,13 @@ impl FileOpener for ParquetOpener {
457457
}
458458

459459
// If there is a predicate that can be evaluated against the metadata
460-
if let Some(predicate) = predicate_ref.as_ref() {
460+
if let Some(pruning_pred) = pruning_pred.as_ref() {
461461
if enable_row_group_stats_pruning {
462462
row_groups.prune_by_statistics(
463463
&physical_file_schema,
464464
reader_metadata.parquet_schema(),
465465
rg_metadata,
466-
predicate,
466+
pruning_pred,
467467
&file_metrics,
468468
);
469469
} else {
@@ -474,16 +474,18 @@ impl FileOpener for ParquetOpener {
474474
.add_matched(row_groups.remaining_row_group_count());
475475
}
476476

477-
// Bloom filter pruning: create a separate reader for bloom
478-
// filter I/O since the push decoder doesn't own the reader.
479477
if enable_bloom_filter && !row_groups.is_empty() {
480-
let bf_reader: Box<dyn AsyncFileReader> = parquet_file_reader_factory
481-
.create_reader(
478+
// Use the existing reader for bloom filter I/O;
479+
// replace with a fresh reader for decoding below.
480+
let bf_reader = std::mem::replace(
481+
&mut async_file_reader,
482+
parquet_file_reader_factory.create_reader(
482483
partition_index,
483484
partitioned_file.clone(),
484485
metadata_size_hint,
485486
&metrics,
486-
)?;
487+
)?,
488+
);
487489
let mut bf_builder =
488490
ParquetRecordBatchStreamBuilder::new_with_metadata(
489491
bf_reader,
@@ -493,7 +495,7 @@ impl FileOpener for ParquetOpener {
493495
.prune_by_bloom_filters(
494496
&physical_file_schema,
495497
&mut bf_builder,
496-
predicate,
498+
pruning_pred,
497499
&file_metrics,
498500
)
499501
.await;
@@ -558,7 +560,7 @@ impl FileOpener for ParquetOpener {
558560
}
559561

560562
// ---------------------------------------------------------
561-
// Step: construct the push decoder builder
563+
// Step: construct builder for the final RecordBatch stream
562564
// ---------------------------------------------------------
563565

564566
let mut builder =
@@ -619,7 +621,10 @@ impl FileOpener for ParquetOpener {
619621
let arrow_reader_metrics = ArrowReaderMetrics::enabled();
620622

621623
let indices = projection.column_indices();
622-
let mask = ProjectionMask::roots(reader_metadata.parquet_schema(), indices);
624+
let mask = ProjectionMask::roots(
625+
reader_metadata.parquet_schema(),
626+
indices.clone(),
627+
);
623628

624629
let decoder = builder
625630
.with_projection(mask)
@@ -632,13 +637,22 @@ impl FileOpener for ParquetOpener {
632637
file_metrics.predicate_cache_inner_records.clone();
633638
let predicate_cache_records = file_metrics.predicate_cache_records.clone();
634639

635-
// Create a stream that drives the decoder by fetching data as needed
640+
// Rebase column indices to match the narrowed stream schema.
641+
// The projection expressions have indices based on physical_file_schema,
642+
// but the stream only contains the columns selected by the ProjectionMask.
643+
let stream_schema =
644+
Arc::new(physical_file_schema.project(&indices)?);
645+
let replace_schema = stream_schema != output_schema;
646+
let projection = projection
647+
.try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?;
648+
let projector = projection.make_projector(&stream_schema)?;
636649
let stream = futures::stream::unfold(
637650
PushDecoderStreamState {
638651
decoder,
639652
reader: async_file_reader,
640-
projection,
653+
projector,
641654
output_schema,
655+
replace_schema,
642656
arrow_reader_metrics,
643657
predicate_cache_inner_records,
644658
predicate_cache_records,
@@ -672,11 +686,7 @@ impl FileOpener for ParquetOpener {
672686
&state.predicate_cache_inner_records,
673687
&state.predicate_cache_records,
674688
);
675-
let result = apply_projection(
676-
batch,
677-
&state.projection,
678-
&state.output_schema,
679-
);
689+
let result = state.project_batch(batch);
680690
return Some((result, state));
681691
}
682692
Ok(DecodeResult::Finished) => {
@@ -694,7 +704,6 @@ impl FileOpener for ParquetOpener {
694704
// Step: wrap the stream so a dynamic filter can stop the file scan early
695705
// ----------------------------------------------------------------------
696706
if let Some(file_pruner) = file_pruner {
697-
// Box the stream first to satisfy Unpin bound on EarlyStoppingStream
698707
let boxed_stream = stream.boxed();
699708
Ok(EarlyStoppingStream::new(
700709
boxed_stream,
@@ -709,41 +718,31 @@ impl FileOpener for ParquetOpener {
709718
}
710719
}
711720

712-
/// State carried through the unfold stream
713721
struct PushDecoderStreamState {
714722
decoder: parquet::arrow::push_decoder::ParquetPushDecoder,
715723
reader: Box<dyn AsyncFileReader>,
716-
projection: ProjectionExprs,
724+
projector: datafusion_physical_expr::projection::Projector,
717725
output_schema: Arc<arrow::datatypes::Schema>,
726+
replace_schema: bool,
718727
arrow_reader_metrics: ArrowReaderMetrics,
719728
predicate_cache_inner_records: Gauge,
720729
predicate_cache_records: Gauge,
721730
}
722731

723-
/// Apply projection expressions to the decoded batch and ensure correct schema
724-
fn apply_projection(
725-
batch: RecordBatch,
726-
projection: &ProjectionExprs,
727-
output_schema: &Arc<arrow::datatypes::Schema>,
728-
) -> Result<RecordBatch> {
729-
let stream_schema = batch.schema();
730-
let projection = projection
731-
.clone()
732-
.try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?;
733-
let projector = projection.make_projector(&stream_schema)?;
734-
let mut batch = projector.project_batch(&batch)?;
735-
736-
if batch.schema() != *output_schema {
737-
let (_schema, arrays, num_rows) = batch.into_parts();
738-
let options = RecordBatchOptions::new().with_row_count(Some(num_rows));
739-
batch = RecordBatch::try_new_with_options(
740-
Arc::clone(output_schema),
741-
arrays,
742-
&options,
743-
)?;
732+
impl PushDecoderStreamState {
733+
fn project_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
734+
let mut batch = self.projector.project_batch(&batch)?;
735+
if self.replace_schema {
736+
let (_schema, arrays, num_rows) = batch.into_parts();
737+
let options = RecordBatchOptions::new().with_row_count(Some(num_rows));
738+
batch = RecordBatch::try_new_with_options(
739+
Arc::clone(&self.output_schema),
740+
arrays,
741+
&options,
742+
)?;
743+
}
744+
Ok(batch)
744745
}
745-
746-
Ok(batch)
747746
}
748747

749748
/// Copies metrics from ArrowReaderMetrics (the metrics collected by the

0 commit comments

Comments
 (0)