Skip to content

Commit b4739e5

Browse files
adriangbclaude
andauthored
refactor(parquet-datasource): split opener.rs into an opener/ module (#22346)
## Which issue does this PR close? Relates to the discussion in #22024 about the Parquet datasource crate becoming hard to navigate. Split out of #22156, which bundled several code-motion moves into one PR — this is one of three smaller, independently-reviewable PRs that replace it. ## Rationale for this change `opener.rs` had grown to ~2,700 LOC, bundling several distinct responsibilities into one file. That makes it hard to read and hard to review changes in isolation. This PR is **pure code motion**: no behavior change and no public API change. ## What changes are included in this PR? Splits `opener.rs` into an `opener/` directory module: - `opener/early_stop.rs` — `EarlyStoppingStream`, the dynamic-filter early-termination wrapper applied at the end of `build_stream`. - `opener/encryption.rs` — `EncryptionContext` and the `ParquetMorselizer::get_encryption_context` helpers, isolating the `#[cfg(feature = "parquet_encryption")]` gating that previously bled through the main file. `opener.rs` becomes `opener/mod.rs`. Note: #22156 originally also extracted an `opener/push_decoder_stream.rs`. That move is now obsolete — #22289 has since extracted `PushDecoderStreamState` into `push_decoder.rs` — so it is dropped here. ## Are these changes tested? Yes, covered by existing tests. `cargo test -p datafusion-datasource-parquet --all-features` (122 passing) and `cargo clippy -p datafusion-datasource-parquet --all-targets --all-features -- -D warnings` both pass. ## Are there any user-facing changes? No. `opener` was already a private module; this only reorganizes files inside the crate. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 2d1d53e commit b4739e5

3 files changed

Lines changed: 219 additions & 153 deletions

File tree

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! [`EarlyStoppingStream`] terminates a Parquet file scan when a dynamic
19+
//! filter narrows after the scan has already started.
20+
21+
use std::pin::Pin;
22+
use std::task::{Context, Poll};
23+
24+
use arrow::array::RecordBatch;
25+
use datafusion_common::Result;
26+
use datafusion_physical_plan::metrics::PruningMetrics;
27+
use datafusion_pruning::FilePruner;
28+
use futures::{Stream, StreamExt, ready};
29+
30+
/// Wraps an inner RecordBatchStream and a [`FilePruner`]
31+
///
32+
/// This can terminate the scan early when some dynamic filters is updated after
33+
/// the scan starts, so we discover after the scan starts that the file can be
34+
/// pruned (can't have matching rows).
35+
pub(super) struct EarlyStoppingStream<S> {
36+
/// Has the stream finished processing? All subsequent polls will return
37+
/// None
38+
done: bool,
39+
file_pruner: FilePruner,
40+
files_ranges_pruned_statistics: PruningMetrics,
41+
/// The inner stream
42+
inner: S,
43+
}
44+
45+
impl<S> EarlyStoppingStream<S> {
46+
pub(super) fn new(
47+
stream: S,
48+
file_pruner: FilePruner,
49+
files_ranges_pruned_statistics: PruningMetrics,
50+
) -> Self {
51+
Self {
52+
done: false,
53+
inner: stream,
54+
file_pruner,
55+
files_ranges_pruned_statistics,
56+
}
57+
}
58+
}
59+
60+
impl<S> EarlyStoppingStream<S>
61+
where
62+
S: Stream<Item = Result<RecordBatch>> + Unpin,
63+
{
64+
fn check_prune(&mut self, input: Result<RecordBatch>) -> Result<Option<RecordBatch>> {
65+
let batch = input?;
66+
67+
// Since dynamic filters may have been updated, see if we can stop
68+
// reading this stream entirely.
69+
if self.file_pruner.should_prune()? {
70+
self.files_ranges_pruned_statistics.add_pruned(1);
71+
// Previously this file range has been counted as matched
72+
self.files_ranges_pruned_statistics.subtract_matched(1);
73+
self.done = true;
74+
Ok(None)
75+
} else {
76+
// Return the adapted batch
77+
Ok(Some(batch))
78+
}
79+
}
80+
}
81+
82+
impl<S> Stream for EarlyStoppingStream<S>
83+
where
84+
S: Stream<Item = Result<RecordBatch>> + Unpin,
85+
{
86+
type Item = Result<RecordBatch>;
87+
88+
fn poll_next(
89+
mut self: Pin<&mut Self>,
90+
cx: &mut Context<'_>,
91+
) -> Poll<Option<Self::Item>> {
92+
if self.done {
93+
return Poll::Ready(None);
94+
}
95+
match ready!(self.inner.poll_next_unpin(cx)) {
96+
None => {
97+
// input done
98+
self.done = true;
99+
Poll::Ready(None)
100+
}
101+
Some(input_batch) => {
102+
let output = self.check_prune(input_batch);
103+
Poll::Ready(output.transpose())
104+
}
105+
}
106+
}
107+
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Encryption context used during Parquet file open.
19+
//!
20+
//! Isolated here so the `#[cfg(feature = "parquet_encryption")]` gating does
21+
//! not pollute the rest of the opener module.
22+
23+
#[cfg(feature = "parquet_encryption")]
24+
use std::sync::Arc;
25+
26+
use datafusion_common::Result;
27+
#[cfg(feature = "parquet_encryption")]
28+
use datafusion_common::config::EncryptionFactoryOptions;
29+
#[cfg(feature = "parquet_encryption")]
30+
use datafusion_common::encryption::FileDecryptionProperties;
31+
#[cfg(feature = "parquet_encryption")]
32+
use datafusion_execution::parquet_encryption::EncryptionFactory;
33+
34+
use super::ParquetMorselizer;
35+
36+
#[derive(Default)]
37+
pub(super) struct EncryptionContext {
38+
#[cfg(feature = "parquet_encryption")]
39+
file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
40+
#[cfg(feature = "parquet_encryption")]
41+
encryption_factory: Option<(Arc<dyn EncryptionFactory>, EncryptionFactoryOptions)>,
42+
}
43+
44+
#[cfg(feature = "parquet_encryption")]
45+
impl EncryptionContext {
46+
fn new(
47+
file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
48+
encryption_factory: Option<(
49+
Arc<dyn EncryptionFactory>,
50+
EncryptionFactoryOptions,
51+
)>,
52+
) -> Self {
53+
Self {
54+
file_decryption_properties,
55+
encryption_factory,
56+
}
57+
}
58+
59+
pub(super) async fn get_file_decryption_properties(
60+
&self,
61+
file_location: &object_store::path::Path,
62+
) -> Result<Option<Arc<FileDecryptionProperties>>> {
63+
match &self.file_decryption_properties {
64+
Some(file_decryption_properties) => {
65+
Ok(Some(Arc::clone(file_decryption_properties)))
66+
}
67+
None => match &self.encryption_factory {
68+
Some((encryption_factory, encryption_config)) => Ok(encryption_factory
69+
.get_file_decryption_properties(encryption_config, file_location)
70+
.await?),
71+
None => Ok(None),
72+
},
73+
}
74+
}
75+
}
76+
77+
#[cfg(not(feature = "parquet_encryption"))]
78+
#[expect(dead_code)]
79+
impl EncryptionContext {
80+
pub(super) async fn get_file_decryption_properties(
81+
&self,
82+
_file_location: &object_store::path::Path,
83+
) -> Result<
84+
Option<std::sync::Arc<datafusion_common::encryption::FileDecryptionProperties>>,
85+
> {
86+
Ok(None)
87+
}
88+
}
89+
90+
impl ParquetMorselizer {
91+
#[cfg(feature = "parquet_encryption")]
92+
pub(super) fn get_encryption_context(&self) -> EncryptionContext {
93+
EncryptionContext::new(
94+
self.file_decryption_properties.clone(),
95+
self.encryption_factory.clone(),
96+
)
97+
}
98+
99+
#[cfg(not(feature = "parquet_encryption"))]
100+
#[expect(dead_code)]
101+
pub(super) fn get_encryption_context(&self) -> EncryptionContext {
102+
EncryptionContext::default()
103+
}
104+
}

0 commit comments

Comments
 (0)