Skip to content

Commit 393c03f

Browse files
committed
Address feedback from @adriangb
1 parent 8cd86f8 commit 393c03f

File tree

3 files changed

+47
-22
lines changed

3 files changed

+47
-22
lines changed

datafusion/datasource-parquet/src/source.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -518,7 +518,7 @@ impl FileSource for ParquetSource {
518518
_partition: usize,
519519
) -> datafusion_common::Result<Arc<dyn FileOpener>> {
520520
datafusion_common::internal_err!(
521-
"ParquetSource::create_file_opener called but it supports the Morsel API"
521+
"ParquetSource::create_file_opener called but it supports the Morsel API, please use that instead"
522522
)
523523
}
524524

datafusion/datasource/src/file_stream/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -748,7 +748,7 @@ mod tests {
748748
}
749749

750750
/// Verifies that a planner can traverse two sequential I/O phases before
751-
/// producing one batch (similar to Parquet which does this0.
751+
/// producing one batch, similar to Parquet.
752752
#[tokio::test]
753753
async fn morsel_two_ios_one_batch() -> Result<()> {
754754
let test = FileStreamMorselTest::new().with_file(

datafusion/datasource/src/file_stream/scan_state.rs

Lines changed: 45 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,39 @@ use futures::{FutureExt as _, StreamExt as _};
2828

2929
use super::{FileStreamMetrics, OnError};
3030

31-
/// State [`FileStreamState::Scan`]
31+
/// State [`FileStreamState::Scan`].
3232
///
33-
/// Groups together ready planners, ready morsels, the active reader,
34-
/// pending planner I/O, the remaining files and limit, and the metrics
35-
/// associated with processing that work.
33+
/// There is one `ScanState` per `FileStream`, and thus per output partition.
34+
///
35+
/// It groups together the lifecycle of scanning that partition's files:
36+
/// unopened files, CPU-ready planners, pending planner I/O, ready morsels,
37+
/// the active reader, and the metrics associated with processing that work.
38+
///
39+
/// # State Transitions
40+
///
41+
/// ```text
42+
/// file_iter
43+
/// |
44+
/// v
45+
/// morselizer.plan_file(file)
46+
/// |
47+
/// v
48+
/// ready_planners ---> plan() ---> ready_morsels ---> into_stream() ---> reader ---> RecordBatches
49+
/// ^ |
50+
/// | v
51+
/// | pending_planner
52+
/// | |
53+
/// | v
54+
/// +-------- poll until ready
55+
/// ```
3656
///
3757
/// [`FileStreamState::Scan`]: super::FileStreamState::Scan
3858
pub(super) struct ScanState {
3959
/// Files that still need to be planned.
4060
file_iter: VecDeque<PartitionedFile>,
41-
/// Remaining record limit, if any.
61+
/// Remaining row limit, if any.
4262
remain: Option<usize>,
43-
/// The morselizer used to plan files
63+
/// The morselizer used to plan files.
4464
morselizer: Box<dyn Morselizer>,
4565
/// Behavior if opening or scanning a file fails.
4666
on_error: OnError,
@@ -50,7 +70,7 @@ pub(super) struct ScanState {
5070
ready_morsels: VecDeque<Box<dyn Morsel>>,
5171
/// The active reader, if any.
5272
reader: Option<BoxStream<'static, Result<RecordBatch>>>,
53-
/// Planner currently doing I/O
73+
/// The single planner currently blocked on I/O, if any.
5474
pending_planner: Option<PendingMorselPlanner>,
5575
/// Metrics for the active scan queues.
5676
metrics: FileStreamMetrics,
@@ -83,8 +103,14 @@ impl ScanState {
83103
self.on_error = on_error;
84104
}
85105

86-
/// Drives one iteration of the active scan state, reading from morsels,
87-
/// planners, pending planner I/O, or unopened files from `self`.
106+
/// Drives one iteration of the active scan state.
107+
///
108+
/// Work is attempted in this order:
109+
/// 1. resolve any pending planner I/O
110+
/// 2. poll the active reader
111+
/// 3. turn a ready morsel into the active reader
112+
/// 4. run CPU planning on a ready planner
113+
/// 5. morselize the next unopened file
88114
///
89115
/// The return [`ScanAndReturn`] tells `poll_inner` how to update the
90116
/// outer `FileStreamState`.
@@ -120,17 +146,16 @@ impl ScanState {
120146
}
121147
}
122148

123-
// Next try and get the net batch from the active reader, if any
149+
// Next try and get the next batch from the active reader, if any.
124150
if let Some(reader) = self.reader.as_mut() {
125151
match reader.poll_next_unpin(cx) {
126-
// According to the API contract, readers should always be ready
127-
// but in practice they may actually be waiting on IO, and if
128-
// that happens wait for it here.
152+
// Morsels should ideally only expose ready-to-decode streams,
153+
// but tolerate pending readers here.
129154
Poll::Pending => return ScanAndReturn::Return(Poll::Pending),
130155
Poll::Ready(Some(Ok(batch))) => {
131156
self.metrics.time_scanning_until_data.stop();
132157
self.metrics.time_scanning_total.stop();
133-
// check limit
158+
// Apply any remaining row limit.
134159
let (batch, finished) = match &mut self.remain {
135160
Some(remain) => {
136161
if *remain > batch.num_rows() {
@@ -179,21 +204,20 @@ impl ScanState {
179204
}
180205
}
181206

182-
// Don't have a reader but have morsels ready to turn into a reader, so do that.
207+
// No active reader, but a morsel is ready to become the reader.
183208
if let Some(morsel) = self.ready_morsels.pop_front() {
184-
self.metrics.files_opened.add(1);
185209
self.metrics.time_opening.stop();
186210
self.metrics.time_scanning_until_data.start();
187211
self.metrics.time_scanning_total.start();
188212
self.reader = Some(morsel.into_stream());
189213
return ScanAndReturn::Continue;
190214
}
191215

192-
// Don't have a morsel or stream, so try and plan some more morsels
216+
// No reader or morsel, so try to produce more work via CPU planning.
193217
if let Some(planner) = self.ready_planners.pop_front() {
194218
return match planner.plan() {
195219
Ok(Some(mut plan)) => {
196-
// Get all morsels and planners and try again
220+
// Queue any newly-ready morsels, planners, or planner I/O.
197221
self.ready_morsels.extend(plan.take_morsels());
198222
self.ready_planners.extend(plan.take_ready_planners());
199223
if let Some(pending_planner) = plan.take_pending_planner() {
@@ -220,7 +244,7 @@ impl ScanState {
220244
};
221245
}
222246

223-
// No planners, morsels, or active reader, so try and open the next file and plan it.
247+
// No outstanding work remains, so morselize the next unopened file.
224248
let part_file = match self.file_iter.pop_front() {
225249
Some(part_file) => part_file,
226250
None => return ScanAndReturn::Done(None),
@@ -229,6 +253,7 @@ impl ScanState {
229253
self.metrics.time_opening.start();
230254
match self.morselizer.plan_file(part_file) {
231255
Ok(planner) => {
256+
self.metrics.files_opened.add(1);
232257
self.ready_planners.push_back(planner);
233258
ScanAndReturn::Continue
234259
}
@@ -247,7 +272,7 @@ impl ScanState {
247272

248273
/// What should be done on the next iteration of [`ScanState::poll_scan`]?
249274
pub(super) enum ScanAndReturn {
250-
/// Poll again
275+
/// Poll again.
251276
Continue,
252277
/// Return the provided result without changing the outer state.
253278
Return(Poll<Option<Result<RecordBatch>>>),

0 commit comments

Comments
 (0)