Skip to content

Commit 1ca7dc5

Browse files
Dandandanclaude
andcommitted
Rewrite ParquetOpener to use push-based ParquetPushDecoder
Replace the async pull-based ParquetRecordBatchStreamBuilder with arrow-rs's SansIO ParquetPushDecoder for reading Parquet files. The caller now controls IO explicitly via DecodeResult::NeedsData, pushing byte ranges to the decoder and receiving decoded batches. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 1a23f17 commit 1ca7dc5

File tree

1 file changed

+8
-10
lines changed

1 file changed

+8
-10
lines changed

datafusion/datasource-parquet/src/opener.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -621,10 +621,8 @@ impl FileOpener for ParquetOpener {
621621
let arrow_reader_metrics = ArrowReaderMetrics::enabled();
622622

623623
let indices = projection.column_indices();
624-
let mask = ProjectionMask::roots(
625-
reader_metadata.parquet_schema(),
626-
indices.clone(),
627-
);
624+
let mask =
625+
ProjectionMask::roots(reader_metadata.parquet_schema(), indices.clone());
628626

629627
let decoder = builder
630628
.with_projection(mask)
@@ -640,8 +638,7 @@ impl FileOpener for ParquetOpener {
640638
// Rebase column indices to match the narrowed stream schema.
641639
// The projection expressions have indices based on physical_file_schema,
642640
// but the stream only contains the columns selected by the ProjectionMask.
643-
let stream_schema =
644-
Arc::new(physical_file_schema.project(&indices)?);
641+
let stream_schema = Arc::new(physical_file_schema.project(&indices)?);
645642
let replace_schema = stream_schema != output_schema;
646643
let projection = projection
647644
.try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?;
@@ -686,7 +683,7 @@ impl FileOpener for ParquetOpener {
686683
&state.predicate_cache_inner_records,
687684
&state.predicate_cache_records,
688685
);
689-
let result = state.project_batch(batch);
686+
let result = state.project_batch(&batch);
690687
return Some((result, state));
691688
}
692689
Ok(DecodeResult::Finished) => {
@@ -698,7 +695,8 @@ impl FileOpener for ParquetOpener {
698695
}
699696
}
700697
},
701-
);
698+
)
699+
.fuse();
702700

703701
// ----------------------------------------------------------------------
704702
// Step: wrap the stream so a dynamic filter can stop the file scan early
@@ -730,8 +728,8 @@ struct PushDecoderStreamState {
730728
}
731729

732730
impl PushDecoderStreamState {
733-
fn project_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
734-
let mut batch = self.projector.project_batch(&batch)?;
731+
fn project_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
732+
let mut batch = self.projector.project_batch(batch)?;
735733
if self.replace_schema {
736734
let (_schema, arrays, num_rows) = batch.into_parts();
737735
let options = RecordBatchOptions::new().with_row_count(Some(num_rows));

0 commit comments

Comments
 (0)