-
Notifications
You must be signed in to change notification settings - Fork 2k
Rewrite FileStream in terms of Morsel API #21342
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,6 +25,7 @@ use std::sync::Arc; | |
| use crate::file_groups::FileGroupPartitioner; | ||
| use crate::file_scan_config::FileScanConfig; | ||
| use crate::file_stream::FileOpener; | ||
| use crate::morsel::{FileOpenerMorselizer, Morselizer}; | ||
| #[expect(deprecated)] | ||
| use crate::schema_adapter::SchemaAdapterFactory; | ||
| use datafusion_common::config::ConfigOptions; | ||
|
|
@@ -63,13 +64,33 @@ pub fn as_file_source<T: FileSource + 'static>(source: T) -> Arc<dyn FileSource> | |
| /// | ||
| /// [`DataSource`]: crate::source::DataSource | ||
| pub trait FileSource: Send + Sync { | ||
| /// Creates a `dyn FileOpener` based on given parameters | ||
| /// Creates a `dyn FileOpener` based on given parameters. | ||
| /// | ||
| /// Note: File sources with a native morsel implementation should return an | ||
| /// error from this method and implementing [`Self::create_morselizer`] instead. | ||
| fn create_file_opener( | ||
| &self, | ||
| object_store: Arc<dyn ObjectStore>, | ||
| base_config: &FileScanConfig, | ||
| partition: usize, | ||
| ) -> Result<Arc<dyn FileOpener>>; | ||
|
|
||
| /// Creates a `dyn Morselizer` based on given parameters. | ||
| /// | ||
| /// The default implementation preserves existing behavior by adapting the | ||
| /// legacy [`FileOpener`] API into a [`Morselizer`]. | ||
| /// | ||
| /// It is preferred to implement the [`Morselizer`] API directly by | ||
| /// implementing this method. | ||
| fn create_morselizer( | ||
| &self, | ||
| object_store: Arc<dyn ObjectStore>, | ||
| base_config: &FileScanConfig, | ||
| partition: usize, | ||
| ) -> Result<Box<dyn Morselizer>> { | ||
| let opener = self.create_file_opener(object_store, base_config, partition)?; | ||
| Ok(Box::new(FileOpenerMorselizer::new(opener))) | ||
| } | ||
|
Comment on lines
+78
to
+93
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🚀 |
||
| /// Any | ||
| fn as_any(&self) -> &dyn Any; | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,8 @@ | |
| use std::sync::Arc; | ||
|
|
||
| use crate::file_scan_config::FileScanConfig; | ||
| use crate::file_stream::scan_state::ScanState; | ||
| use crate::morsel::{FileOpenerMorselizer, Morselizer}; | ||
| use datafusion_common::{Result, internal_err}; | ||
| use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet}; | ||
|
|
||
|
|
@@ -28,7 +30,7 @@ use super::{FileOpener, FileStream, FileStreamState, OnError}; | |
| pub struct FileStreamBuilder<'a> { | ||
| config: &'a FileScanConfig, | ||
| partition: Option<usize>, | ||
| file_opener: Option<Arc<dyn FileOpener>>, | ||
| morselizer: Option<Box<dyn Morselizer>>, | ||
| metrics: Option<&'a ExecutionPlanMetricsSet>, | ||
| on_error: OnError, | ||
| } | ||
|
|
@@ -39,7 +41,7 @@ impl<'a> FileStreamBuilder<'a> { | |
| Self { | ||
| config, | ||
| partition: None, | ||
| file_opener: None, | ||
| morselizer: None, | ||
| metrics: None, | ||
| on_error: OnError::Fail, | ||
| } | ||
|
|
@@ -52,8 +54,18 @@ impl<'a> FileStreamBuilder<'a> { | |
| } | ||
|
|
||
| /// Configure the [`FileOpener`] used to open files. | ||
| /// | ||
| /// This will overwrite any setting from [`Self::with_morselizer`] | ||
| pub fn with_file_opener(mut self, file_opener: Arc<dyn FileOpener>) -> Self { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While I think it could make sense to keep
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This method is the way we could keep using FileOpener (as it is simpler) I am not sure how we could still allow using FileOpener but not keep this method |
||
| self.file_opener = Some(file_opener); | ||
| self.morselizer = Some(Box::new(FileOpenerMorselizer::new(file_opener))); | ||
| self | ||
| } | ||
|
|
||
| /// Configure the [`Morselizer`] used to open files. | ||
| /// | ||
| /// This will overwrite any setting from [`Self::with_file_opener`] | ||
| pub fn with_morselizer(mut self, morselizer: Box<dyn Morselizer>) -> Self { | ||
| self.morselizer = Some(morselizer); | ||
| self | ||
| } | ||
|
|
||
|
|
@@ -74,16 +86,16 @@ impl<'a> FileStreamBuilder<'a> { | |
| let Self { | ||
| config, | ||
| partition, | ||
| file_opener, | ||
| morselizer, | ||
| metrics, | ||
| on_error, | ||
| } = self; | ||
|
|
||
| let Some(partition) = partition else { | ||
| return internal_err!("FileStreamBuilder missing required partition"); | ||
| }; | ||
| let Some(file_opener) = file_opener else { | ||
| return internal_err!("FileStreamBuilder missing required file_opener"); | ||
| let Some(morselizer) = morselizer else { | ||
| return internal_err!("FileStreamBuilder missing required morselizer"); | ||
| }; | ||
| let Some(metrics) = metrics else { | ||
| return internal_err!("FileStreamBuilder missing required metrics"); | ||
|
|
@@ -95,15 +107,19 @@ impl<'a> FileStreamBuilder<'a> { | |
| ); | ||
| }; | ||
|
|
||
| let file_stream_metrics = FileStreamMetrics::new(metrics, partition); | ||
| let scan_state = Box::new(ScanState::new( | ||
| file_group.into_inner(), | ||
| config.limit, | ||
| morselizer, | ||
| on_error, | ||
| file_stream_metrics, | ||
| )); | ||
|
|
||
| Ok(FileStream { | ||
| file_iter: file_group.into_inner().into_iter().collect(), | ||
| projected_schema, | ||
| remain: config.limit, | ||
| file_opener, | ||
| state: FileStreamState::Idle, | ||
| file_stream_metrics: FileStreamMetrics::new(metrics, partition), | ||
| state: FileStreamState::Scan { scan_state }, | ||
| baseline_metrics: BaselineMetrics::new(metrics, partition), | ||
| on_error, | ||
| }) | ||
| } | ||
| } | ||
alamb marked this conversation as resolved.
Show resolved
Hide resolved
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this while debugging why the tests failed on CI and not locally (it was when this feature flag was on the Error messages got mangled).
I added a crate level feature to enable the feature in datafusion-common so I could reproduce the error locally