Skip to content

Commit bed1fb4

Browse files
committed
update comments
1 parent 5600774 commit bed1fb4

File tree

2 files changed

+32
-27
lines changed

2 files changed

+32
-27
lines changed

datafusion/datasource-parquet/src/opener.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,8 @@ impl ParquetOpenState {
427427

428428
/// Adapter for a [`MorselPlanner`] to the [`FileOpener`] API
429429
///
430-
/// Implements state machine described in [`ParquetOpenState`]
430+
/// Compatibility adapter that drives a morsel planner through the
431+
/// [`FileOpener`] API.
431432
struct ParquetOpenFuture {
432433
planner: Option<Box<dyn MorselPlanner>>,
433434
pending_io: Option<PendingMorselPlanner>,
@@ -452,29 +453,31 @@ impl Future for ParquetOpenFuture {
452453

453454
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
454455
loop {
455-
// If waiting on IO, poll
456+
// If planner I/O completed, resume with the returned planner.
456457
if let Some(io_future) = self.pending_io.as_mut() {
457458
let maybe_planner = ready!(io_future.poll_unpin(cx));
458-
// future has resolved. Clear pending io before processing the
459-
// result to ensure that if the future returns an error, we
460-
// don't end up in a state where both are set and accidentally
461-
// ignore the error on the next poll
459+
// Clear `pending_io` before handling the result so an error
460+
// cannot leave both continuation paths populated.
462461
self.pending_io = None;
463462
self.planner = Some(maybe_planner?);
464463
}
465464

466-
// have a morsel ready to go, return that
465+
// If a stream morsel is ready, return it.
467466
if let Some(morsel) = self.ready_morsels.pop_front() {
468467
return Poll::Ready(Ok(morsel.into_stream()));
469468
}
470469

470+
// This shim must always own either a planner, a pending planner
471+
// future, or a ready morsel. Reaching this branch means the
472+
// continuation was lost.
471473
let Some(planner) = self.planner.take() else {
472-
// any path that leave planner as non
473474
return Poll::Ready(internal_err!(
474475
"ParquetOpenFuture polled after completion"
475476
));
476477
};
477478

479+
// Planner completed without producing a stream morsel.
480+
// (e.g. all row groups were pruned)
478481
let Some(mut plan) = planner.plan()? else {
479482
return Poll::Ready(Ok(futures::stream::empty().boxed()));
480483
};
@@ -520,7 +523,7 @@ impl Morsel for ParquetStreamMorsel {
520523
}
521524
}
522525

523-
/// Planner for opening a single parquet file via the morsel APIs.
526+
/// Per-file planner that owns the current [`ParquetOpenState`].
524527
struct ParquetMorselPlanner {
525528
/// Ready to perform CPU-only planning work.
526529
state: ParquetOpenState,

datafusion/datasource/src/morsel/mod.rs

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,10 @@ pub trait Morsel: Send + Debug {
5353
pub trait Morselizer: Send + Sync + Debug {
5454
/// Return the initial [`MorselPlanner`] for this file.
5555
///
56-
/// "Morselzing" a file may involve CPU work, such as parsing parquet
56+
/// Morselizing a file may involve CPU work, such as parsing parquet
5757
/// metadata and evaluating pruning predicates. It should NOT do any I/O
58-
/// work, such as reading from the file. If I/O is required, it should
59-
/// return a future that the caller can poll to drive the I/O work to
60-
/// completion, and once the future is complete, the caller can call
61-
/// `plan_file` again for a different file.
58+
/// work, such as reading from the file. Any needed I/O should be done using
59+
/// [`MorselPlan::with_pending_planner`].
6260
fn plan_file(&self, file: PartitionedFile) -> Result<Box<dyn MorselPlanner>>;
6361
}
6462

@@ -84,9 +82,9 @@ pub trait MorselPlanner: Send + Debug {
8482
/// parquet metadata and evaluating pruning predicates.
8583
///
8684
/// It should NOT do any I/O work, such as reading from the file. If I/O is
87-
/// required, the returned [`MorselPlan`] should contain a future that the
88-
/// caller polls to drive the I/O work to completion. Once the future is
89-
/// complete, the caller can call `plan` again to get the next morsels.
85+
/// required, the returned [`MorselPlan`] should contain a pending planner
86+
/// future that the caller polls to drive the I/O work to completion. Once
87+
/// that future resolves, it yields a planner ready for work.
9088
///
9189
/// Note this function is **not async** to make it explicitly clear that if
9290
/// I/O is required, it should be done in the returned `io_future`.
@@ -118,11 +116,11 @@ pub struct MorselPlan {
118116
morsels: Vec<Box<dyn Morsel>>,
119117
/// Planners that are ready for CPU work.
120118
ready_planners: Vec<Box<dyn MorselPlanner>>,
121-
/// A future that is doing IO that will resolve to MorselPlanner
119+
/// A future with planner I/O that resolves to a CPU ready planner.
122120
///
123121
/// DataFusion will poll this future occasionally to drive the I/O work to
124-
/// completion. Once the future resolves, DataFusion will call `plan` again
125-
/// to get the next morsels.
122+
/// completion. Once it resolves, planning continues with the returned
123+
/// planner.
126124
pending_planner: Option<PendingMorselPlanner>,
127125
}
128126

@@ -144,7 +142,7 @@ impl MorselPlan {
144142
self
145143
}
146144

147-
/// Set the pending future for planning
145+
/// Set the pending planner for an I/O phase.
148146
pub fn with_pending_planner<F>(mut self, io_future: F) -> Self
149147
where
150148
F: Future<Output = Result<Box<dyn MorselPlanner>>> + Send + 'static,
@@ -153,7 +151,7 @@ impl MorselPlan {
153151
self
154152
}
155153

156-
/// Set the pending future for planning
154+
/// Set the pending planner for an I/O phase.
157155
pub fn set_pending_planner<F>(&mut self, io_future: F)
158156
where
159157
F: Future<Output = Result<Box<dyn MorselPlanner>>> + Send + 'static,
@@ -188,14 +186,18 @@ pub struct PendingMorselPlanner {
188186
}
189187

190188
impl PendingMorselPlanner {
191-
/// Create a new pending morselization I/O future
189+
/// Create a new pending planner future.
192190
///
193191
/// Example
194192
/// ```
195-
/// # use datafusion_datasource::morsel::PendingMorselPlanner;
193+
/// # use datafusion_common::DataFusionError;
194+
/// # use datafusion_datasource::morsel::{MorselPlanner, PendingMorselPlanner};
196195
/// let work = async move {
197-
/// // Do I/O work here
198-
/// # unimplemented!()
196+
/// let planner: Box<dyn MorselPlanner> = {
197+
/// // Do I/O work here, then return the next planner to run.
198+
/// # unimplemented!();
199+
/// };
200+
/// Ok(planner) as Result<_, DataFusionError>;
199201
/// };
200202
/// let pending_io = PendingMorselPlanner::new(work);
201203
/// ```
@@ -214,7 +216,7 @@ impl PendingMorselPlanner {
214216
}
215217
}
216218

217-
/// wraps the inner future
219+
/// Forwards polling to the underlying future.
218220
impl Future for PendingMorselPlanner {
219221
type Output = Result<Box<dyn MorselPlanner>>;
220222
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

0 commit comments

Comments
 (0)