Rewrite FileStream in terms of Morsel API#21342
Conversation
816d243 to
3346af7
Compare
| /// This groups together ready planners, ready morsels, the active reader, | ||
| /// pending planner I/O, the remaining files and limit, and the metrics | ||
| /// associated with processing that work. | ||
| pub(super) struct ScanState { |
There was a problem hiding this comment.
This is the new inner state machine for FileStream
There was a problem hiding this comment.
I think some more diagrams in the docstring of the struct and/or fields could help. I'm trying to wrap my head around how the IO queue and such work.
| use std::sync::Arc; | ||
| use std::sync::mpsc::{self, Receiver, TryRecvError}; | ||
|
|
||
| /// Adapt a legacy [`FileOpener`] to the morsel API. |
There was a problem hiding this comment.
This is an adapter so that existing FileOpeners continue to have the same behavior
| @@ -0,0 +1,556 @@ | |||
| // Licensed to the Apache Software Foundation (ASF) under one | |||
There was a problem hiding this comment.
This is testing infrastructure to write the snapshot tests
| return Poll::Ready(Some(Err(err))); | ||
| } | ||
| } | ||
| FileStreamState::Scan { scan_state: queue } => { |
There was a problem hiding this comment.
moved the inner state machine into a separate module/struct to try and keep indenting under control and encapsualte the complexity somewhat
| assert!(err.contains("FileStreamBuilder invalid partition index: 1")); | ||
| } | ||
|
|
||
| /// Verifies the simplest morsel-driven flow: one planner produces one |
There was a problem hiding this comment.
Here are tests showing the sequence of calls to the various morsel APIs. I intend to use this framework to show how work can migrate from one stream to the other
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
b5c452a to
d5a1f74
Compare
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing alamb/file_stream_split (d5a1f74) to 1e93a67 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
| all-features = true | ||
|
|
||
| [features] | ||
| backtrace = ["datafusion-common/backtrace"] |
There was a problem hiding this comment.
I added this while debugging why the tests failed on CI and not locally (it was when this feature flag was on the Error messages got mangled).
I added a crate level feature to enable the feature in datafusion-common so I could reproduce the error locally
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing alamb/file_stream_split (d5a1f74) to 1e93a67 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing alamb/file_stream_split (d5a1f74) to 1e93a67 (merge-base) diff using: tpcds File an issue against this benchmark runner |
d5a1f74 to
b2c9bd6
Compare
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
adriangb
left a comment
There was a problem hiding this comment.
Ran out of time for the last couple of files. A lot of the comments are just tracking my thought process, I plan to go over them again to clarify my own understanding but maybe they're helpful as input on how the code reads top to bottom for a first time reader.
| /// Creates a `dyn Morselizer` based on given parameters. | ||
| /// | ||
| /// The default implementation preserves existing behavior by adapting the | ||
| /// legacy [`FileOpener`] API into a [`Morselizer`]. | ||
| /// | ||
| /// It is preferred to implement the [`Morselizer`] API directly by | ||
| /// implementing this method. | ||
| fn create_morselizer( | ||
| &self, | ||
| object_store: Arc<dyn ObjectStore>, | ||
| base_config: &FileScanConfig, | ||
| partition: usize, | ||
| ) -> Result<Box<dyn Morselizer>> { | ||
| let opener = self.create_file_opener(object_store, base_config, partition)?; | ||
| Ok(Box::new(FileOpenerMorselizer::new(opener))) | ||
| } |
| _partition: usize, | ||
| ) -> datafusion_common::Result<Arc<dyn FileOpener>> { | ||
| datafusion_common::internal_err!( | ||
| "ParquetSource::create_file_opener called but it supports the Morsel API" |
There was a problem hiding this comment.
| "ParquetSource::create_file_opener called but it supports the Morsel API" | |
| "ParquetSource::create_file_opener called but it supports the Morsel API, please use that instead" |
Note that this will be a breaking change for folks using ParquetSource directly (which I believe @xudong963 / @zhuqi-lucas are based on #21290).
| /// Configure the [`FileOpener`] used to open files. | ||
| /// | ||
| /// This will overwrite any setting from [`Self::with_morselizer`] | ||
| pub fn with_file_opener(mut self, file_opener: Arc<dyn FileOpener>) -> Self { |
There was a problem hiding this comment.
While I think it could make sense to keep FileOpener as a public API for building data sources (if we consider it simpler, for folks who don't care about perf), this method in particular seems like a mostly internal method (even if it is pub) on we might as well deprecate / remove.
There was a problem hiding this comment.
This method is the way we could keep using FileOpener (as it is simpler)
I am not sure how we could still allow using FileOpener but not keep this method
| if let FileStreamState::Scan { scan_state } = &mut self.state { | ||
| scan_state.set_on_error(on_error); | ||
| } | ||
| self |
There was a problem hiding this comment.
Currently this is the only state it makes sense to modify (the others are terminal states). But I did have to go check the FileStreamState enum to confirm. Might be worth either adding a comment here or just doing a match with FileStreamState::Error(_) | FileStreamState::Done(_) and add a comment on top explaining those are terminal states + to force ourselves to handle new cases in the future if they were added. It would be an annoying bug to debug, worth the 1 LOC IMO.
| /// The active reader, if any. | ||
| reader: Option<BoxStream<'static, Result<RecordBatch>>>, |
There was a problem hiding this comment.
Is there one ScanState across all partitions or one per partition? I'm guessing the latter: file_iter: VecDeque<PartitionedFile> is the files for this partition, we pump all of the files into one output stream of RecordBatch (reader). But we can have multiple planners / morsels ready and merge those all into a single stream of RecordBatch on the way out.
There was a problem hiding this comment.
One per partition
we pump all of the files into one output stream of RecordBatch (reader). But we can have multiple planners / morsels ready and merge those all into a single stream of RecordBatch on the way out.
My initial proposal (following @Dandandan 's original design" is that when possible the files are put into a shared queue so that when a FileStream is ready it gets the next file
I think once we get that structure in place, we can contemplate more sophisticated designs (like one filestream preparing a parquet file, and then divying up the record batches between other cores)
| } | ||
|
|
||
| if let Some(morsel) = self.ready_morsels.pop_front() { | ||
| self.metrics.files_opened.add(1); |
There was a problem hiding this comment.
Does a morsel map to a file opened? I thought opening a file produces the morsels (i.e. this metric should be incremented elsewhere).
There was a problem hiding this comment.
You are right -- the files opened should be for files "morselized"
| self.ready_morsels.extend(plan.take_morsels()); | ||
| self.ready_planners.extend(plan.take_planners()); |
There was a problem hiding this comment.
I see now, a planner can produce more planners (this is how it cycles through IO and CPU)
|
Ok the first PR in the chain is ready for review: (that is basically 50% of this PR) |
| /// Configure the [`FileOpener`] used to open files. | ||
| /// | ||
| /// This will overwrite any setting from [`Self::with_morselizer`] | ||
| pub fn with_file_opener(mut self, file_opener: Arc<dyn FileOpener>) -> Self { |
There was a problem hiding this comment.
This method is the way we could keep using FileOpener (as it is simpler)
I am not sure how we could still allow using FileOpener but not keep this method
| /// The file-format-specific morselizer used to plan files. | ||
| morselizer: Box<dyn Morselizer>, | ||
| /// Describes the behavior if opening or scanning a file fails. | ||
| on_error: OnError, | ||
| /// CPU-ready planners for the current file. | ||
| ready_planners: VecDeque<Box<dyn MorselPlanner>>, | ||
| /// Ready morsels for the current file. | ||
| ready_morsels: VecDeque<Box<dyn Morsel>>, |
There was a problem hiding this comment.
My understanding of the state machine is File -> MorselPlanner (via Morselizer, an IO operation) and then MorselPlanner -> Morsel (a CPU operation) and finally Morsel -> RecordBatch(es) (IO). Is that right?
That is basically right, except the last step Morsel --> RecordBatch(es) should not have IO (though you are right that it does now)
Operations like Morsel -> RecordBatch are still a mix of IO/CPU (especially with filter pushdown on).
My idea is that we change that so that the RecordBatches don't actually flow until we have all data buffered and ready to decode
This is possible to do with the arrow-rs parquet reader when:
- Each MorselPlanner is for a single RowGroup
- We don't produce the morsel stream until we start getting batches (there is some version of that in Sketch out a Morselize API #20820)
| /// The active reader, if any. | ||
| reader: Option<BoxStream<'static, Result<RecordBatch>>>, |
There was a problem hiding this comment.
One per partition
we pump all of the files into one output stream of RecordBatch (reader). But we can have multiple planners / morsels ready and merge those all into a single stream of RecordBatch on the way out.
My initial proposal (following @Dandandan 's original design" is that when possible the files are put into a shared queue so that when a FileStream is ready it gets the next file
I think once we get that structure in place, we can contemplate more sophisticated designs (like one filestream preparing a parquet file, and then divying up the record batches between other cores)
| } | ||
|
|
||
| if let Some(morsel) = self.ready_morsels.pop_front() { | ||
| self.metrics.files_opened.add(1); |
There was a problem hiding this comment.
You are right -- the files opened should be for files "morselized"
8985e37 to
4084de9
Compare
…er` (#21327) ~(Draft until I am sure I can use this API to make FileStream behave better)~ ## Which issue does this PR close? - part of #20529 - Needed for #21351 - Broken out of #20820 - Closes #21427 ## Rationale for this change I can get 10% faster on many ClickBench queries by reordeirng files at runtime. You can see it all working together here: #21351 To do do, I need to rework the FileStream so that it can reorder operations at runtime. Eventually that will include both CPU and IO. This PR is a step in the direction by introducing the main Morsel API and implementing it for Parquet. The next PR (#21342) rewrites FileStream in terms of the Morsel API ## What changes are included in this PR? 1. Add proposed `Morsel` API 2. Rewrite Parquet opener in terms of that API 3. Add an adapter layer (back to FileOpener, so I don't have to rewrite FileStream in the same PR) My next PR will rewrite the FileStream to use the Morsel API ## Are these changes tested? Yes by existing CI. I will work on adding additional tests for just Parquet opener in a follow on PR ## Are there any user-facing changes? No
Stacked on
ParquetOpenertoParquetMorselizer#21327Which issue does this PR close?
Rationale for this change
The Morsel API allows for finer grain parallelism (and IO). It is important to have the FileStream work in terms of the Morsel API to allow future features (like workstealing, etc)
What changes are included in this PR?
Are these changes tested?
Yes by existing functional and benchmark tests, as well as new functional tests
Are there any user-facing changes?
No (not yet)