Skip to content

Commit e1c1a45

Browse files
adriangbclaude
andcommitted
refactor(parquet-datasource): extract DecoderProjection from build_stream
`RowGroupsPrunedParquetOpen::build_stream` used to inline the `build_projection_read_plan` + `reassign_expr_columns` + `make_projector` + `replace_schema` triple right next to the decoder / stream wiring, which made the opener's main orchestration body harder to follow. Move that triple into a new `post_scan_filter` module exposing a single `DecoderProjection::build(projection, physical_file_schema, parquet_schema, output_schema)` entry point that returns the projection mask, projector, and replace_schema flag. The opener becomes a single call. `replace_schema` is now derived from the projector's output schema (rather than the read plan's projected schema) so it stays correct under future widening of the decoder mask. `DecoderBuilderConfig` now carries the projection mask directly (`projection_mask: &ProjectionMask`) instead of the full `ParquetReadPlan`, since the read plan's `projected_schema` is no longer needed in this layer. No behaviour change. All existing parquet tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent c8b784a commit e1c1a45

4 files changed

Lines changed: 122 additions & 21 deletions

File tree

datafusion/datasource-parquet/src/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ pub mod metadata;
3030
mod metrics;
3131
mod opener;
3232
mod page_filter;
33+
mod post_scan_filter;
3334
mod push_decoder;
3435
mod reader;
3536
mod row_filter;

datafusion/datasource-parquet/src/opener/mod.rs

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@ use self::early_stop::EarlyStoppingStream;
2525
use self::encryption::EncryptionContext;
2626
use crate::access_plan::PreparedAccessPlan;
2727
use crate::page_filter::PagePruningAccessPlanFilter;
28+
use crate::post_scan_filter::DecoderProjection;
2829
use crate::push_decoder::{DecoderBuilderConfig, PushDecoderStreamState};
29-
use crate::row_filter::{RowFilterGenerator, build_projection_read_plan};
30+
use crate::row_filter::RowFilterGenerator;
3031
use crate::row_group_filter::{BloomFilterStatistics, RowGroupAccessPlanFilter};
3132
use crate::{
3233
ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory,
@@ -36,7 +37,6 @@ use arrow::array::RecordBatch;
3637
use arrow::datatypes::DataType;
3738
use datafusion_datasource::morsel::{Morsel, MorselPlan, MorselPlanner, Morselizer};
3839
use datafusion_physical_expr::projection::ProjectionExprs;
39-
use datafusion_physical_expr::utils::reassign_expr_columns;
4040
use datafusion_physical_expr_adapter::replace_columns_with_literals;
4141
use std::collections::{HashMap, VecDeque};
4242
use std::fmt;
@@ -1148,11 +1148,18 @@ impl RowGroupsPrunedParquetOpen {
11481148
};
11491149

11501150
let arrow_reader_metrics = ArrowReaderMetrics::enabled();
1151-
let read_plan = build_projection_read_plan(
1152-
prepared.projection.expr_iter(),
1151+
1152+
// Build the decoder projection (mask + projector + replace_schema) in
1153+
// a single call. Encapsulating it behind `DecoderProjection` keeps
1154+
// the opener's orchestration body focused on filter / decoder /
1155+
// stream wiring, and gives a clean seam for the in-scan post-scan
1156+
// filter introduced in a later change.
1157+
let decoder_projection = DecoderProjection::build(
1158+
&prepared.projection,
11531159
&prepared.physical_file_schema,
11541160
reader_metadata.parquet_schema(),
1155-
);
1161+
&prepared.output_schema,
1162+
)?;
11561163

11571164
let (decoder, pending_decoders, remaining_limit) = {
11581165
let pushdown_predicate = prepared
@@ -1180,7 +1187,7 @@ impl RowGroupsPrunedParquetOpen {
11801187
let remaining_limit = prepared.limit.filter(|_| run_count > 1);
11811188

11821189
let decoder_config = DecoderBuilderConfig {
1183-
read_plan: &read_plan,
1190+
projection_mask: &decoder_projection.projection_mask,
11841191
batch_size: prepared.batch_size,
11851192
arrow_reader_metrics: &arrow_reader_metrics,
11861193
force_filter_selections: prepared.force_filter_selections,
@@ -1218,18 +1225,11 @@ impl RowGroupsPrunedParquetOpen {
12181225
let predicate_cache_records =
12191226
prepared.file_metrics.predicate_cache_records.clone();
12201227

1221-
// Check if we need to replace the schema to handle things like differing nullability or metadata.
1222-
// See note below about file vs. output schema.
1223-
let stream_schema = read_plan.projected_schema;
1224-
let replace_schema = stream_schema != prepared.output_schema;
1225-
1226-
// Rebase column indices to match the narrowed stream schema.
1227-
// The projection expressions have indices based on physical_file_schema,
1228-
// but the stream only contains the columns selected by the ProjectionMask.
1229-
let projection = prepared
1230-
.projection
1231-
.try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?;
1232-
let projector = projection.make_projector(&stream_schema)?;
1228+
let DecoderProjection {
1229+
projection_mask: _,
1230+
projector,
1231+
replace_schema,
1232+
} = decoder_projection;
12331233
let output_schema = Arc::clone(&prepared.output_schema);
12341234
let files_ranges_pruned_statistics =
12351235
prepared.file_metrics.files_ranges_pruned_statistics.clone();
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Decoder-projection construction for the parquet scan.
19+
//!
20+
//! Owns the projection-mask + projector + schema-replacement triple the
21+
//! opener installs on every parquet decoder run, behind a single
22+
//! [`DecoderProjection::build`] entry point. Keeping it here lets the
23+
//! opener orchestrate scans with one call instead of an inline block of
24+
//! `build_projection_read_plan` / `reassign_expr_columns` / `make_projector`,
25+
//! and gives a clean seam for the in-scan post-scan filter that follows in
26+
//! a later change (`PostScanFilter`, when conjuncts the parquet `RowFilter`
27+
//! cannot evaluate fall through to a decoded-batch predicate).
28+
29+
use arrow::datatypes::SchemaRef;
30+
31+
use datafusion_common::Result;
32+
use datafusion_physical_expr::projection::{ProjectionExprs, Projector};
33+
use datafusion_physical_expr::utils::reassign_expr_columns;
34+
35+
use parquet::arrow::ProjectionMask;
36+
use parquet::schema::types::SchemaDescriptor;
37+
38+
use crate::row_filter::build_projection_read_plan;
39+
40+
/// The parquet decoder projection: the [`ProjectionMask`] installed on every
41+
/// decoder run in the scan, the [`Projector`] that maps decoder output
42+
/// batches to the user-visible output, and the `replace_schema` flag that
43+
/// tells [`PushDecoderStreamState`](crate::push_decoder::PushDecoderStreamState)
44+
/// whether the projector's output schema must be rebuilt with the requested
45+
/// `output_schema` (e.g. for metadata / nullability mismatches).
46+
///
47+
/// Built once per file by the opener via [`Self::build`].
48+
pub(crate) struct DecoderProjection {
49+
/// Projection mask passed to the parquet decoder.
50+
pub(crate) projection_mask: ProjectionMask,
51+
/// Maps decoder output (stream) batches to the user-visible output.
52+
pub(crate) projector: Projector,
53+
/// `true` when the projector's output schema differs from `output_schema`
54+
/// in metadata / nullability and the caller must rebuild the batch with
55+
/// `output_schema` before yielding it.
56+
pub(crate) replace_schema: bool,
57+
}
58+
59+
impl DecoderProjection {
60+
/// Build the decoder projection state for a file.
61+
///
62+
/// `projection` references columns in `physical_file_schema` (i.e. already
63+
/// adapted by the per-file expr adapter); `parquet_schema` is the
64+
/// corresponding parquet `SchemaDescriptor`. `output_schema` is what
65+
/// consumers of the scan stream expect.
66+
pub(crate) fn build(
67+
projection: &ProjectionExprs,
68+
physical_file_schema: &SchemaRef,
69+
parquet_schema: &SchemaDescriptor,
70+
output_schema: &SchemaRef,
71+
) -> Result<Self> {
72+
let read_plan = build_projection_read_plan(
73+
projection.expr_iter(),
74+
physical_file_schema,
75+
parquet_schema,
76+
);
77+
78+
let stream_schema = read_plan.projected_schema;
79+
80+
// Rebase the projection onto the decoder's stream schema (column
81+
// indices change because the decoder yields only the masked columns).
82+
let rebased_projection = projection
83+
.clone()
84+
.try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?;
85+
let projector = rebased_projection.make_projector(&stream_schema)?;
86+
87+
// Compare against the projector's *output* schema rather than the
88+
// stream schema, so future widening of the mask (for post-scan filter
89+
// columns) does not flip this flag.
90+
let replace_schema = projector.output_schema() != output_schema;
91+
92+
Ok(Self {
93+
projection_mask: read_plan.projection_mask,
94+
projector,
95+
replace_schema,
96+
})
97+
}
98+
}

datafusion/datasource-parquet/src/push_decoder.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use futures::StreamExt;
4040
use futures::stream::BoxStream;
4141
use parquet::DecodeResult;
4242
use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
43+
use parquet::arrow::ProjectionMask;
4344
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, RowSelectionPolicy};
4445
use parquet::arrow::async_reader::AsyncFileReader;
4546
use parquet::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder};
@@ -49,7 +50,6 @@ use datafusion_physical_expr::projection::Projector;
4950
use datafusion_physical_plan::metrics::{BaselineMetrics, Gauge};
5051

5152
use crate::access_plan::PreparedAccessPlan;
52-
use crate::row_filter::ParquetReadPlan;
5353

5454
/// Shared options applied to every [`ParquetPushDecoderBuilder`] in a file scan.
5555
///
@@ -58,7 +58,9 @@ use crate::row_filter::ParquetReadPlan;
5858
/// requirements). All decoders in that scan share the same projection, batch
5959
/// size, metrics sink, and selection policy.
6060
pub(crate) struct DecoderBuilderConfig<'a> {
61-
pub(crate) read_plan: &'a ParquetReadPlan,
61+
/// Projection mask installed on every decoder in the scan. Sourced from
62+
/// the file's [`DecoderProjection`](crate::post_scan_filter::DecoderProjection).
63+
pub(crate) projection_mask: &'a ProjectionMask,
6264
pub(crate) batch_size: usize,
6365
pub(crate) arrow_reader_metrics: &'a ArrowReaderMetrics,
6466
pub(crate) force_filter_selections: bool,
@@ -77,7 +79,7 @@ impl DecoderBuilderConfig<'_> {
7779
metadata: ArrowReaderMetadata,
7880
) -> ParquetPushDecoderBuilder {
7981
let mut builder = ParquetPushDecoderBuilder::new_with_metadata(metadata)
80-
.with_projection(self.read_plan.projection_mask.clone())
82+
.with_projection(self.projection_mask.clone())
8183
.with_batch_size(self.batch_size)
8284
.with_metrics(self.arrow_reader_metrics.clone());
8385
if self.force_filter_selections {

0 commit comments

Comments
 (0)