Skip to content

Commit b044e05

Browse files
committed
refactor(parquet): split opener.rs into a module and add ParquetAccessPlanOptimizer trait
Two complementary changes that keep the Parquet opener manageable as more pruning strategies stack up: 1. Convert `opener.rs` into an `opener/` module. `EarlyStoppingStream` and `EncryptionContext` move to sibling files (`early_stop.rs`, `encryption.rs`). Pure code motion: no public API change, no behavior change. 2. Add `ParquetAccessPlanOptimizer`, an extension trait for refining a `ParquetAccessPlan` during file open. The opener invokes registered user-supplied optimizers after each built-in pruning stage (`AfterMetadata`, `AfterBloomFilters`, `BeforeBuildStream`). `ParquetSource::with_access_plan_optimizer` is the registration point. The built-in pruning passes (range, statistics, bloom filter, limit, page index) are deliberately *not* migrated to the trait in this PR; they share a `is_fully_matched` side-state with `prune_by_limit` that isn't yet modeled on the trait surface, so migrating them would require either plumbing that state through the trait or losing the limit-via-fully-matched optimization for built-in passes. Tracking that as follow-up.
1 parent 0c4ace8 commit b044e05

6 files changed

Lines changed: 607 additions & 159 deletions

File tree

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Extension point for refining a [`ParquetAccessPlan`] during file open.
19+
//!
20+
//! The Parquet opener narrows down which row groups (and which rows within
21+
//! them) it will read through a fixed sequence of built-in passes:
22+
//!
23+
//! - file-range pruning,
24+
//! - row-group statistics pruning,
25+
//! - bloom-filter pruning,
26+
//! - limit-based pruning,
27+
//! - page-index pruning.
28+
//!
29+
//! Each pass operates on a [`ParquetAccessPlan`]. The
30+
//! [`ParquetAccessPlanOptimizer`] trait exposes that pipeline as an extension
31+
//! point so external crates can append additional passes — sampling, custom
32+
//! statistics, user-defined Parquet indexes, etc. — without having to fork
33+
//! the opener.
34+
//!
35+
//! User-supplied optimizers are invoked **after** the built-in passes for the
36+
//! corresponding [`OptimizerStage`]. They can read the access plan and the
37+
//! pruning context, then return a (possibly narrowed) plan; they cannot
38+
//! widen access beyond what the built-ins produced.
39+
40+
use std::fmt::Debug;
41+
use std::sync::Arc;
42+
43+
use datafusion_common::Result;
44+
use datafusion_datasource::{FileRange, PartitionedFile};
45+
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
46+
use parquet::file::metadata::ParquetMetaData;
47+
48+
use crate::ParquetAccessPlan;
49+
use crate::ParquetFileMetrics;
50+
use crate::page_filter::PagePruningAccessPlanFilter;
51+
use arrow::datatypes::SchemaRef;
52+
use datafusion_pruning::PruningPredicate;
53+
54+
/// Stage at which a [`ParquetAccessPlanOptimizer`] runs during file open.
55+
///
56+
/// Each stage has access to a different subset of file metadata, reflecting
57+
/// the order in which the opener loads it. See [`AccessPlanContext`] for
58+
/// the fields available at each stage.
59+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
60+
pub enum OptimizerStage {
61+
/// Runs after the Parquet footer and page index (when enabled) are
62+
/// loaded, and after the built-in file-range and row-group-statistics
63+
/// passes have refined the plan. Bloom filters have **not** been
64+
/// loaded at this point.
65+
AfterMetadata,
66+
/// Runs after the bloom filters for surviving row groups have been
67+
/// loaded, and after the built-in bloom-filter pruning pass.
68+
AfterBloomFilters,
69+
/// Runs at the end of the pruning pipeline, just before the stream
70+
/// is built, after the built-in limit and page-index passes.
71+
BeforeBuildStream,
72+
}
73+
74+
/// Read-only context passed to a [`ParquetAccessPlanOptimizer`].
75+
///
76+
/// Some fields are only populated at certain [`OptimizerStage`]s; see each
77+
/// field's docs.
78+
#[derive(Debug)]
79+
pub struct AccessPlanContext<'a> {
80+
/// Execution partition index for the scan.
81+
pub partition_index: usize,
82+
/// The file being opened.
83+
pub partitioned_file: &'a PartitionedFile,
84+
/// Optional byte range restricting which part of the file to read.
85+
pub file_range: Option<&'a FileRange>,
86+
/// Schema of the file after type coercions (the schema the parquet
87+
/// reader will produce, before projection).
88+
pub physical_file_schema: &'a SchemaRef,
89+
/// Loaded Parquet metadata, including page index when enabled.
90+
pub file_metadata: &'a ParquetMetaData,
91+
/// Raw predicate applied to this scan, if any.
92+
pub predicate: Option<&'a Arc<dyn PhysicalExpr>>,
93+
/// Row-group-level pruning predicate derived from `predicate`.
94+
pub pruning_predicate: Option<&'a Arc<PruningPredicate>>,
95+
/// Page-index pruning predicate derived from `predicate`.
96+
pub page_pruning_predicate: Option<&'a Arc<PagePruningAccessPlanFilter>>,
97+
/// Outer query limit, if any.
98+
pub limit: Option<usize>,
99+
/// Whether the query requires the original row order to be preserved.
100+
pub preserve_order: bool,
101+
/// Per-file metrics. Optimizers may emit to these counters.
102+
pub file_metrics: &'a ParquetFileMetrics,
103+
/// Current optimizer stage.
104+
pub stage: OptimizerStage,
105+
}
106+
107+
/// Trait for narrowing a [`ParquetAccessPlan`] during Parquet file open.
108+
///
109+
/// The opener invokes registered optimizers after each built-in pruning
110+
/// stage (see [`OptimizerStage`]). An optimizer that does not apply at the
111+
/// current stage should return the plan unchanged.
112+
///
113+
/// # Example
114+
///
115+
/// A sampling optimizer that keeps a fraction of the surviving row groups:
116+
///
117+
/// ```ignore
118+
/// use std::sync::Arc;
119+
/// use datafusion_common::Result;
120+
/// use datafusion_datasource_parquet::{
121+
/// access_plan_optimizer::{
122+
/// AccessPlanContext, OptimizerStage, ParquetAccessPlanOptimizer,
123+
/// },
124+
/// ParquetAccessPlan, RowGroupAccess,
125+
/// };
126+
///
127+
/// #[derive(Debug)]
128+
/// struct SampleHalf;
129+
///
130+
/// impl ParquetAccessPlanOptimizer for SampleHalf {
131+
/// fn stage(&self) -> OptimizerStage { OptimizerStage::BeforeBuildStream }
132+
///
133+
/// fn optimize(
134+
/// &self,
135+
/// _ctx: &AccessPlanContext<'_>,
136+
/// mut plan: ParquetAccessPlan,
137+
/// ) -> Result<ParquetAccessPlan> {
138+
/// for (idx, count) in (0..plan.len()).step_by(2).zip(std::iter::repeat(())) {
139+
/// let _ = count;
140+
/// plan.skip(idx);
141+
/// }
142+
/// Ok(plan)
143+
/// }
144+
/// }
145+
/// ```
146+
pub trait ParquetAccessPlanOptimizer: Debug + Send + Sync {
147+
/// At which stage of the opener this optimizer runs.
148+
fn stage(&self) -> OptimizerStage;
149+
150+
/// Refine `plan` for `ctx`. Returning `plan` unchanged is the no-op.
151+
fn optimize(
152+
&self,
153+
ctx: &AccessPlanContext<'_>,
154+
plan: ParquetAccessPlan,
155+
) -> Result<ParquetAccessPlan>;
156+
}
157+
158+
/// Run all `optimizers` whose [`OptimizerStage`] matches `ctx.stage`.
159+
pub(crate) fn run_stage(
160+
optimizers: &[Arc<dyn ParquetAccessPlanOptimizer>],
161+
ctx: &AccessPlanContext<'_>,
162+
mut plan: ParquetAccessPlan,
163+
) -> Result<ParquetAccessPlan> {
164+
for opt in optimizers {
165+
if opt.stage() == ctx.stage {
166+
plan = opt.optimize(ctx, plan)?;
167+
}
168+
}
169+
Ok(plan)
170+
}

datafusion/datasource-parquet/src/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#![cfg_attr(test, allow(clippy::needless_pass_by_value))]
2626

2727
pub mod access_plan;
28+
pub mod access_plan_optimizer;
2829
pub mod file_format;
2930
pub mod metadata;
3031
mod metrics;
@@ -39,6 +40,9 @@ mod supported_predicates;
3940
mod writer;
4041

4142
pub use access_plan::{ParquetAccessPlan, RowGroupAccess};
43+
pub use access_plan_optimizer::{
44+
AccessPlanContext, OptimizerStage, ParquetAccessPlanOptimizer,
45+
};
4246
pub use file_format::*;
4347
pub use metrics::ParquetFileMetrics;
4448
pub use page_filter::PagePruningAccessPlanFilter;
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! [`EarlyStoppingStream`] terminates a Parquet file scan when a dynamic
19+
//! filter narrows after the scan has already started.
20+
21+
use std::pin::Pin;
22+
use std::task::{Context, Poll};
23+
24+
use arrow::array::RecordBatch;
25+
use datafusion_common::Result;
26+
use datafusion_physical_plan::metrics::PruningMetrics;
27+
use datafusion_pruning::FilePruner;
28+
use futures::{Stream, StreamExt, ready};
29+
30+
/// Wraps an inner RecordBatchStream and a [`FilePruner`]
31+
///
32+
/// This can terminate the scan early when some dynamic filters is updated after
33+
/// the scan starts, so we discover after the scan starts that the file can be
34+
/// pruned (can't have matching rows).
35+
pub(super) struct EarlyStoppingStream<S> {
36+
/// Has the stream finished processing? All subsequent polls will return
37+
/// None
38+
done: bool,
39+
file_pruner: FilePruner,
40+
files_ranges_pruned_statistics: PruningMetrics,
41+
/// The inner stream
42+
inner: S,
43+
}
44+
45+
impl<S> EarlyStoppingStream<S> {
46+
pub(super) fn new(
47+
stream: S,
48+
file_pruner: FilePruner,
49+
files_ranges_pruned_statistics: PruningMetrics,
50+
) -> Self {
51+
Self {
52+
done: false,
53+
inner: stream,
54+
file_pruner,
55+
files_ranges_pruned_statistics,
56+
}
57+
}
58+
}
59+
60+
impl<S> EarlyStoppingStream<S>
61+
where
62+
S: Stream<Item = Result<RecordBatch>> + Unpin,
63+
{
64+
fn check_prune(&mut self, input: Result<RecordBatch>) -> Result<Option<RecordBatch>> {
65+
let batch = input?;
66+
67+
// Since dynamic filters may have been updated, see if we can stop
68+
// reading this stream entirely.
69+
if self.file_pruner.should_prune()? {
70+
self.files_ranges_pruned_statistics.add_pruned(1);
71+
// Previously this file range has been counted as matched
72+
self.files_ranges_pruned_statistics.subtract_matched(1);
73+
self.done = true;
74+
Ok(None)
75+
} else {
76+
// Return the adapted batch
77+
Ok(Some(batch))
78+
}
79+
}
80+
}
81+
82+
impl<S> Stream for EarlyStoppingStream<S>
83+
where
84+
S: Stream<Item = Result<RecordBatch>> + Unpin,
85+
{
86+
type Item = Result<RecordBatch>;
87+
88+
fn poll_next(
89+
mut self: Pin<&mut Self>,
90+
cx: &mut Context<'_>,
91+
) -> Poll<Option<Self::Item>> {
92+
if self.done {
93+
return Poll::Ready(None);
94+
}
95+
match ready!(self.inner.poll_next_unpin(cx)) {
96+
None => {
97+
// input done
98+
self.done = true;
99+
Poll::Ready(None)
100+
}
101+
Some(input_batch) => {
102+
let output = self.check_prune(input_batch);
103+
Poll::Ready(output.transpose())
104+
}
105+
}
106+
}
107+
}

0 commit comments

Comments
 (0)