Skip to content

Commit ed29b29

Browse files
committed
Address feedback from @adriangb
1 parent 8cd86f8 commit ed29b29

File tree

3 files changed

+29
-22
lines changed

3 files changed

+29
-22
lines changed

datafusion/datasource-parquet/src/source.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -518,7 +518,7 @@ impl FileSource for ParquetSource {
518518
_partition: usize,
519519
) -> datafusion_common::Result<Arc<dyn FileOpener>> {
520520
datafusion_common::internal_err!(
521-
"ParquetSource::create_file_opener called but it supports the Morsel API"
521+
"ParquetSource::create_file_opener called but it supports the Morsel API, please use that instead"
522522
)
523523
}
524524

datafusion/datasource/src/file_stream/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -748,7 +748,7 @@ mod tests {
748748
}
749749

750750
/// Verifies that a planner can traverse two sequential I/O phases before
751-
/// producing one batch (similar to Parquet which does this0.
751+
/// producing one batch, similar to Parquet.
752752
#[tokio::test]
753753
async fn morsel_two_ios_one_batch() -> Result<()> {
754754
let test = FileStreamMorselTest::new().with_file(

datafusion/datasource/src/file_stream/scan_state.rs

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,21 @@ use futures::{FutureExt as _, StreamExt as _};
2828

2929
use super::{FileStreamMetrics, OnError};
3030

31-
/// State [`FileStreamState::Scan`]
31+
/// State [`FileStreamState::Scan`].
3232
///
33-
/// Groups together ready planners, ready morsels, the active reader,
34-
/// pending planner I/O, the remaining files and limit, and the metrics
35-
/// associated with processing that work.
33+
/// There is one `ScanState` per `FileStream`, and thus per output partition.
34+
///
35+
/// It groups together the lifecycle of scanning that partition's files:
36+
/// unopened files, CPU-ready planners, pending planner I/O, ready morsels,
37+
/// the active reader, and the metrics associated with processing that work.
3638
///
3739
/// [`FileStreamState::Scan`]: super::FileStreamState::Scan
3840
pub(super) struct ScanState {
3941
/// Files that still need to be planned.
4042
file_iter: VecDeque<PartitionedFile>,
41-
/// Remaining record limit, if any.
43+
/// Remaining row limit, if any.
4244
remain: Option<usize>,
43-
/// The morselizer used to plan files
45+
/// The morselizer used to plan files.
4446
morselizer: Box<dyn Morselizer>,
4547
/// Behavior if opening or scanning a file fails.
4648
on_error: OnError,
@@ -50,7 +52,7 @@ pub(super) struct ScanState {
5052
ready_morsels: VecDeque<Box<dyn Morsel>>,
5153
/// The active reader, if any.
5254
reader: Option<BoxStream<'static, Result<RecordBatch>>>,
53-
/// Planner currently doing I/O
55+
/// The single planner currently blocked on I/O, if any.
5456
pending_planner: Option<PendingMorselPlanner>,
5557
/// Metrics for the active scan queues.
5658
metrics: FileStreamMetrics,
@@ -83,8 +85,14 @@ impl ScanState {
8385
self.on_error = on_error;
8486
}
8587

86-
/// Drives one iteration of the active scan state, reading from morsels,
87-
/// planners, pending planner I/O, or unopened files from `self`.
88+
/// Drives one iteration of the active scan state.
89+
///
90+
/// Work is attempted in this order:
91+
/// 1. resolve any pending planner I/O
92+
/// 2. poll the active reader
93+
/// 3. turn a ready morsel into the active reader
94+
/// 4. run CPU planning on a ready planner
95+
/// 5. morselize the next unopened file
8896
///
8997
/// The return [`ScanAndReturn`] tells `poll_inner` how to update the
9098
/// outer `FileStreamState`.
@@ -120,17 +128,16 @@ impl ScanState {
120128
}
121129
}
122130

123-
// Next try and get the net batch from the active reader, if any
131+
// Next try and get the next batch from the active reader, if any.
124132
if let Some(reader) = self.reader.as_mut() {
125133
match reader.poll_next_unpin(cx) {
126-
// According to the API contract, readers should always be ready
127-
// but in practice they may actually be waiting on IO, and if
128-
// that happens wait for it here.
134+
// Morsels should ideally only expose ready-to-decode streams,
135+
// but tolerate pending readers here.
129136
Poll::Pending => return ScanAndReturn::Return(Poll::Pending),
130137
Poll::Ready(Some(Ok(batch))) => {
131138
self.metrics.time_scanning_until_data.stop();
132139
self.metrics.time_scanning_total.stop();
133-
// check limit
140+
// Apply any remaining row limit.
134141
let (batch, finished) = match &mut self.remain {
135142
Some(remain) => {
136143
if *remain > batch.num_rows() {
@@ -179,21 +186,20 @@ impl ScanState {
179186
}
180187
}
181188

182-
// Don't have a reader but have morsels ready to turn into a reader, so do that.
189+
// No active reader, but a morsel is ready to become the reader.
183190
if let Some(morsel) = self.ready_morsels.pop_front() {
184-
self.metrics.files_opened.add(1);
185191
self.metrics.time_opening.stop();
186192
self.metrics.time_scanning_until_data.start();
187193
self.metrics.time_scanning_total.start();
188194
self.reader = Some(morsel.into_stream());
189195
return ScanAndReturn::Continue;
190196
}
191197

192-
// Don't have a morsel or stream, so try and plan some more morsels
198+
// No reader or morsel, so try to produce more work via CPU planning.
193199
if let Some(planner) = self.ready_planners.pop_front() {
194200
return match planner.plan() {
195201
Ok(Some(mut plan)) => {
196-
// Get all morsels and planners and try again
202+
// Queue any newly-ready morsels, planners, or planner I/O.
197203
self.ready_morsels.extend(plan.take_morsels());
198204
self.ready_planners.extend(plan.take_ready_planners());
199205
if let Some(pending_planner) = plan.take_pending_planner() {
@@ -220,7 +226,7 @@ impl ScanState {
220226
};
221227
}
222228

223-
// No planners, morsels, or active reader, so try and open the next file and plan it.
229+
// No outstanding work remains, so morselize the next unopened file.
224230
let part_file = match self.file_iter.pop_front() {
225231
Some(part_file) => part_file,
226232
None => return ScanAndReturn::Done(None),
@@ -229,6 +235,7 @@ impl ScanState {
229235
self.metrics.time_opening.start();
230236
match self.morselizer.plan_file(part_file) {
231237
Ok(planner) => {
238+
self.metrics.files_opened.add(1);
232239
self.ready_planners.push_back(planner);
233240
ScanAndReturn::Continue
234241
}
@@ -247,7 +254,7 @@ impl ScanState {
247254

248255
/// What should be done on the next iteration of [`ScanState::poll_scan`]?
249256
pub(super) enum ScanAndReturn {
250-
/// Poll again
257+
/// Poll again.
251258
Continue,
252259
/// Return the provided result without changing the outer state.
253260
Return(Poll<Option<Result<RecordBatch>>>),

0 commit comments

Comments
 (0)