Skip to content

Commit ad6a507

Browse files
adriangbclaudexudong963
authored
refactor(parquet-datasource): extract DecoderProjection from build_stream (apache#22398)
## Which issue does this PR close? - Closes #. ## Rationale for this change `RowGroupsPrunedParquetOpen::build_stream` inlines the `build_projection_read_plan` + `reassign_expr_columns` + `make_projector` + `replace_schema` quartet right next to the decoder / stream wiring, which makes the opener's main orchestration body harder to follow and mixes two concerns: building the per-file projection vs. wiring it through the push-decoder stream. This PR isolates that block behind a small `DecoderProjection` type whose public surface is just \"give me the projection mask\" and \"project this decoded batch onto the output schema.\" ## What changes are included in this PR? * New `decoder_projection` module with a `DecoderProjection` type: * `DecoderProjection::try_new(projection, physical_file_schema, parquet_schema, output_schema)` constructs the per-file projection in one call. * `projection_mask()` returns the mask installed on every decoder run. * `map(&batch)` applies the projector and, when needed, rebuilds the batch with `output_schema` to recover metadata / nullability that the file schema does not carry. * Fields are private. * `PushDecoderStreamState` collapses three fields (`projector`, `output_schema`, `replace_schema`) into a single `decoder_projection: DecoderProjection`. `project_batch` becomes a one-line delegate to `DecoderProjection::map`. * `replace_schema` is now derived from the projector's *output* schema (rather than the read plan's projected schema) so it stays correct under future widening of the decoder mask. * `DecoderBuilderConfig` carries the projection mask directly (`projection_mask: &ProjectionMask`) instead of the full `ParquetReadPlan`, since the read plan's `projected_schema` is no longer needed in this layer. No behaviour change. ## Are these changes tested? Covered by existing tests: * \`cargo test -p datafusion-datasource-parquet\` — 123 pass. * \`cargo test -p datafusion --test parquet_integration\` — 202 pass. * \`cargo clippy -p datafusion-datasource-parquet --all-targets --all-features -- -D warnings\` — clean. ## Are there any user-facing changes? No. All affected types are \`pub(crate)\`. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Co-authored-by: xudong.w <wxd963996380@gmail.com>
1 parent d318324 commit ad6a507

4 files changed

Lines changed: 155 additions & 51 deletions

File tree

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Decoder-projection construction for the parquet scan.
19+
//!
20+
//! [`DecoderProjection`] owns the two halves of "project a decoded parquet
21+
//! batch onto the scan's output schema":
22+
//!
23+
//! * the [`ProjectionMask`] installed on every parquet decoder run, and
24+
//! * the per-batch transform ([`DecoderProjection::map`]) that applies the
25+
//! projector and, when needed, rebuilds the batch with the user's
26+
//! `output_schema` to recover metadata / nullability the file schema does
27+
//! not carry.
28+
//!
29+
//! The opener constructs one [`DecoderProjection`] per file via
30+
//! [`DecoderProjection::try_new`] and hands it to the push-decoder stream,
31+
//! which calls [`map`](DecoderProjection::map) on every decoded batch.
32+
33+
use std::sync::Arc;
34+
35+
use arrow::array::{RecordBatch, RecordBatchOptions};
36+
use arrow::datatypes::SchemaRef;
37+
38+
use datafusion_common::Result;
39+
use datafusion_physical_expr::projection::{ProjectionExprs, Projector};
40+
use datafusion_physical_expr::utils::reassign_expr_columns;
41+
42+
use parquet::arrow::ProjectionMask;
43+
use parquet::schema::types::SchemaDescriptor;
44+
45+
use crate::row_filter::build_projection_read_plan;
46+
47+
/// Per-file decoder projection: the [`ProjectionMask`] installed on every
48+
/// parquet decoder run, plus the per-batch transform that maps the decoder's
49+
/// output onto the scan's `output_schema`.
50+
///
51+
/// Built once per file by the opener via [`Self::try_new`]; the
52+
/// push-decoder stream installs [`Self::projection_mask`] on each decoder
53+
/// and calls [`Self::map`] on every decoded batch.
54+
pub(crate) struct DecoderProjection {
55+
projection_mask: ProjectionMask,
56+
projector: Projector,
57+
output_schema: SchemaRef,
58+
/// `true` when the projector's output schema differs from `output_schema`
59+
/// in metadata / nullability and [`map`](Self::map) must rebuild the batch
60+
/// with `output_schema`.
61+
replace_schema: bool,
62+
}
63+
64+
impl DecoderProjection {
65+
/// Build the decoder projection for a file.
66+
///
67+
/// `projection` references columns in `physical_file_schema` (i.e. already
68+
/// adapted by the per-file expr adapter); `parquet_schema` is the
69+
/// corresponding parquet [`SchemaDescriptor`]. `output_schema` is what
70+
/// consumers of the scan stream expect.
71+
pub(crate) fn try_new(
72+
projection: &ProjectionExprs,
73+
physical_file_schema: &SchemaRef,
74+
parquet_schema: &SchemaDescriptor,
75+
output_schema: &SchemaRef,
76+
) -> Result<Self> {
77+
let read_plan = build_projection_read_plan(
78+
projection.expr_iter(),
79+
physical_file_schema,
80+
parquet_schema,
81+
);
82+
83+
let stream_schema = read_plan.projected_schema;
84+
85+
// Rebase the projection onto the decoder's stream schema (column
86+
// indices change because the decoder yields only the masked columns).
87+
let rebased_projection = projection
88+
.clone()
89+
.try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?;
90+
let projector = rebased_projection.make_projector(&stream_schema)?;
91+
92+
// Compare against the projector's *output* schema rather than the
93+
// stream schema, so future widening of the mask (e.g. for post-scan
94+
// filter columns) does not flip this flag.
95+
let replace_schema = projector.output_schema() != output_schema;
96+
97+
Ok(Self {
98+
projection_mask: read_plan.projection_mask,
99+
projector,
100+
output_schema: Arc::clone(output_schema),
101+
replace_schema,
102+
})
103+
}
104+
105+
/// The projection mask to install on every parquet decoder in the scan.
106+
pub(crate) fn projection_mask(&self) -> &ProjectionMask {
107+
&self.projection_mask
108+
}
109+
110+
/// Map a decoded batch onto the scan's output schema.
111+
///
112+
/// Applies the [`Projector`] and, when the projector's output schema
113+
/// differs from `output_schema` in metadata or nullability, rebuilds the
114+
/// batch with `output_schema` (some writers emit OPTIONAL fields even when
115+
/// the data has no nulls; some logical schemas carry field-level metadata
116+
/// the file schema does not).
117+
pub(crate) fn map(&self, batch: &RecordBatch) -> Result<RecordBatch> {
118+
let projected = self.projector.project_batch(batch)?;
119+
if !self.replace_schema {
120+
return Ok(projected);
121+
}
122+
let (_stream_schema, arrays, num_rows) = projected.into_parts();
123+
let options = RecordBatchOptions::new().with_row_count(Some(num_rows));
124+
Ok(RecordBatch::try_new_with_options(
125+
Arc::clone(&self.output_schema),
126+
arrays,
127+
&options,
128+
)?)
129+
}
130+
}

datafusion/datasource-parquet/src/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
pub mod access_plan;
2828
mod bloom_filter;
29+
mod decoder_projection;
2930
pub mod file_format;
3031
pub mod metadata;
3132
mod metrics;

datafusion/datasource-parquet/src/opener/mod.rs

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,10 @@ use self::early_stop::EarlyStoppingStream;
2424
#[cfg(feature = "parquet_encryption")]
2525
use self::encryption::EncryptionContext;
2626
use crate::access_plan::PreparedAccessPlan;
27+
use crate::decoder_projection::DecoderProjection;
2728
use crate::page_filter::PagePruningAccessPlanFilter;
2829
use crate::push_decoder::{DecoderBuilderConfig, PushDecoderStreamState};
29-
use crate::row_filter::{RowFilterGenerator, build_projection_read_plan};
30+
use crate::row_filter::RowFilterGenerator;
3031
use crate::row_group_filter::{BloomFilterStatistics, RowGroupAccessPlanFilter};
3132
use crate::{
3233
Int96Coercer, ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory,
@@ -36,7 +37,6 @@ use arrow::array::RecordBatch;
3637
use arrow::datatypes::DataType;
3738
use datafusion_datasource::morsel::{Morsel, MorselPlan, MorselPlanner, Morselizer};
3839
use datafusion_physical_expr::projection::ProjectionExprs;
39-
use datafusion_physical_expr::utils::reassign_expr_columns;
4040
use datafusion_physical_expr_adapter::replace_columns_with_literals;
4141
use std::collections::{HashMap, VecDeque};
4242
use std::fmt;
@@ -1156,11 +1156,17 @@ impl RowGroupsPrunedParquetOpen {
11561156
};
11571157

11581158
let arrow_reader_metrics = ArrowReaderMetrics::enabled();
1159-
let read_plan = build_projection_read_plan(
1160-
prepared.projection.expr_iter(),
1159+
1160+
// Build the decoder projection (mask + per-batch transform) in a
1161+
// single call. Encapsulating it behind `DecoderProjection` keeps the
1162+
// opener's orchestration body focused on filter / decoder / stream
1163+
// wiring.
1164+
let decoder_projection = DecoderProjection::try_new(
1165+
&prepared.projection,
11611166
&prepared.physical_file_schema,
11621167
reader_metadata.parquet_schema(),
1163-
);
1168+
&prepared.output_schema,
1169+
)?;
11641170

11651171
let (decoder, pending_decoders, remaining_limit) = {
11661172
let pushdown_predicate = prepared
@@ -1188,7 +1194,7 @@ impl RowGroupsPrunedParquetOpen {
11881194
let remaining_limit = prepared.limit.filter(|_| run_count > 1);
11891195

11901196
let decoder_config = DecoderBuilderConfig {
1191-
read_plan: &read_plan,
1197+
projection_mask: decoder_projection.projection_mask(),
11921198
batch_size: prepared.batch_size,
11931199
arrow_reader_metrics: &arrow_reader_metrics,
11941200
force_filter_selections: prepared.force_filter_selections,
@@ -1226,29 +1232,14 @@ impl RowGroupsPrunedParquetOpen {
12261232
let predicate_cache_records =
12271233
prepared.file_metrics.predicate_cache_records.clone();
12281234

1229-
// Check if we need to replace the schema to handle things like differing nullability or metadata.
1230-
// See note below about file vs. output schema.
1231-
let stream_schema = read_plan.projected_schema;
1232-
let replace_schema = stream_schema != prepared.output_schema;
1233-
1234-
// Rebase column indices to match the narrowed stream schema.
1235-
// The projection expressions have indices based on physical_file_schema,
1236-
// but the stream only contains the columns selected by the ProjectionMask.
1237-
let projection = prepared
1238-
.projection
1239-
.try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?;
1240-
let projector = projection.make_projector(&stream_schema)?;
1241-
let output_schema = Arc::clone(&prepared.output_schema);
12421235
let files_ranges_pruned_statistics =
12431236
prepared.file_metrics.files_ranges_pruned_statistics.clone();
12441237
let stream = PushDecoderStreamState {
12451238
decoder,
12461239
pending_decoders,
12471240
remaining_limit,
12481241
reader: prepared.async_file_reader,
1249-
projector,
1250-
output_schema,
1251-
replace_schema,
1242+
decoder_projection,
12521243
arrow_reader_metrics,
12531244
predicate_cache_inner_records,
12541245
predicate_cache_records,

datafusion/datasource-parquet/src/push_decoder.rs

Lines changed: 11 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -32,24 +32,22 @@
3232
//! [`PushDecoderStreamState::into_stream`] for consumption.
3333
3434
use std::collections::VecDeque;
35-
use std::sync::Arc;
3635

37-
use arrow::array::{RecordBatch, RecordBatchOptions};
38-
use arrow::datatypes::Schema;
36+
use arrow::array::RecordBatch;
3937
use futures::StreamExt;
4038
use futures::stream::BoxStream;
4139
use parquet::DecodeResult;
40+
use parquet::arrow::ProjectionMask;
4241
use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
4342
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, RowSelectionPolicy};
4443
use parquet::arrow::async_reader::AsyncFileReader;
4544
use parquet::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder};
4645

4746
use datafusion_common::{DataFusionError, Result};
48-
use datafusion_physical_expr::projection::Projector;
4947
use datafusion_physical_plan::metrics::{BaselineMetrics, Gauge};
5048

5149
use crate::access_plan::PreparedAccessPlan;
52-
use crate::row_filter::ParquetReadPlan;
50+
use crate::decoder_projection::DecoderProjection;
5351

5452
/// Shared options applied to every [`ParquetPushDecoderBuilder`] in a file scan.
5553
///
@@ -58,7 +56,9 @@ use crate::row_filter::ParquetReadPlan;
5856
/// requirements). All decoders in that scan share the same projection, batch
5957
/// size, metrics sink, and selection policy.
6058
pub(crate) struct DecoderBuilderConfig<'a> {
61-
pub(crate) read_plan: &'a ParquetReadPlan,
59+
/// Projection mask installed on every decoder in the scan. Sourced from
60+
/// the file's [`DecoderProjection`].
61+
pub(crate) projection_mask: &'a ProjectionMask,
6262
pub(crate) batch_size: usize,
6363
pub(crate) arrow_reader_metrics: &'a ArrowReaderMetrics,
6464
pub(crate) force_filter_selections: bool,
@@ -77,7 +77,7 @@ impl DecoderBuilderConfig<'_> {
7777
metadata: ArrowReaderMetadata,
7878
) -> ParquetPushDecoderBuilder {
7979
let mut builder = ParquetPushDecoderBuilder::new_with_metadata(metadata)
80-
.with_projection(self.read_plan.projection_mask.clone())
80+
.with_projection(self.projection_mask.clone())
8181
.with_batch_size(self.batch_size)
8282
.with_metrics(self.arrow_reader_metrics.clone());
8383
if self.force_filter_selections {
@@ -113,9 +113,9 @@ pub(crate) struct PushDecoderStreamState {
113113
/// here instead.
114114
pub(crate) remaining_limit: Option<usize>,
115115
pub(crate) reader: Box<dyn AsyncFileReader>,
116-
pub(crate) projector: Projector,
117-
pub(crate) output_schema: Arc<Schema>,
118-
pub(crate) replace_schema: bool,
116+
/// Per-file projection: the mask installed on every decoder and the
117+
/// per-batch transform applied by [`Self::project_batch`].
118+
pub(crate) decoder_projection: DecoderProjection,
119119
pub(crate) arrow_reader_metrics: ArrowReaderMetrics,
120120
pub(crate) predicate_cache_inner_records: Gauge,
121121
pub(crate) predicate_cache_records: Gauge,
@@ -216,24 +216,6 @@ impl PushDecoderStreamState {
216216
}
217217

218218
fn project_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
219-
let mut batch = self.projector.project_batch(batch)?;
220-
if self.replace_schema {
221-
// Ensure the output batch has the expected schema.
222-
// This handles things like schema level and field level metadata, which may not be present
223-
// in the physical file schema.
224-
// It is also possible for nullability to differ; some writers create files with
225-
// OPTIONAL fields even when there are no nulls in the data.
226-
// In these cases it may make sense for the logical schema to be `NOT NULL`.
227-
// RecordBatch::try_new_with_options checks that if the schema is NOT NULL
228-
// the array cannot contain nulls, amongst other checks.
229-
let (_stream_schema, arrays, num_rows) = batch.into_parts();
230-
let options = RecordBatchOptions::new().with_row_count(Some(num_rows));
231-
batch = RecordBatch::try_new_with_options(
232-
Arc::clone(&self.output_schema),
233-
arrays,
234-
&options,
235-
)?;
236-
}
237-
Ok(batch)
219+
self.decoder_projection.map(batch)
238220
}
239221
}

0 commit comments

Comments
 (0)