Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

257 changes: 96 additions & 161 deletions datafusion/datasource-parquet/src/opener.rs

Large diffs are not rendered by default.

83 changes: 46 additions & 37 deletions datafusion/datasource-parquet/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ use std::sync::Arc;

use crate::DefaultParquetFileReaderFactory;
use crate::ParquetFileReaderFactory;
use crate::opener::ParquetMorselizer;
use crate::opener::build_pruning_predicates;
use crate::opener::{ParquetMorselizer, ParquetOpener};
use crate::row_filter::can_expr_be_pushed_down_with_schemas;
use datafusion_common::config::ConfigOptions;
#[cfg(feature = "parquet_encryption")]
use datafusion_common::config::EncryptionFactoryOptions;
use datafusion_datasource::as_file_source;
use datafusion_datasource::file_stream::FileOpener;
use datafusion_datasource::morsel::Morselizer;

use arrow::datatypes::TimeUnit;
use datafusion_common::DataFusionError;
Expand Down Expand Up @@ -246,12 +247,12 @@ use parquet::encryption::decrypt::FileDecryptionProperties;
/// # Execution Overview
///
/// * Step 1: `DataSourceExec::execute` is called, returning a `FileStream`
/// configured to open parquet files with a `ParquetOpener`.
/// configured to morselize parquet files with a `ParquetMorselizer`.
///
/// * Step 2: When the stream is polled, the `ParquetOpener` is called to open
/// the file.
/// * Step 2: When the stream is polled, the `ParquetMorselizer` is called to
/// plan the file.
///
/// * Step 3: The `ParquetOpener` gets the [`ParquetMetaData`] (file metadata)
/// * Step 3: The `ParquetMorselizer` gets the [`ParquetMetaData`] (file metadata)
/// via [`ParquetFileReaderFactory`], creating a `ParquetAccessPlan` by
/// applying predicates to metadata. The plan and projections are used to
/// determine what pages must be read.
Expand Down Expand Up @@ -511,11 +512,22 @@ impl From<ParquetSource> for Arc<dyn FileSource> {

impl FileSource for ParquetSource {
fn create_file_opener(
&self,
_object_store: Arc<dyn ObjectStore>,
_base_config: &FileScanConfig,
_partition: usize,
) -> datafusion_common::Result<Arc<dyn FileOpener>> {
datafusion_common::internal_err!(
"ParquetSource::create_file_opener called but it supports the Morsel API, please use that instead"
)
}

fn create_morselizer(
&self,
object_store: Arc<dyn ObjectStore>,
base_config: &FileScanConfig,
partition: usize,
) -> datafusion_common::Result<Arc<dyn FileOpener>> {
) -> datafusion_common::Result<Box<dyn Morselizer>> {
let expr_adapter_factory = base_config
.expr_adapter_factory
.clone()
Expand All @@ -542,37 +554,34 @@ impl FileSource for ParquetSource {
.as_ref()
.map(|time_unit| parse_coerce_int96_string(time_unit.as_str()).unwrap());

let opener = Arc::new(ParquetOpener {
morselizer: ParquetMorselizer {
partition_index: partition,
projection: self.projection.clone(),
batch_size: self
.batch_size
.expect("Batch size must set before creating ParquetOpener"),
limit: base_config.limit,
preserve_order: base_config.preserve_order,
predicate: self.predicate.clone(),
table_schema: self.table_schema.clone(),
metadata_size_hint: self.metadata_size_hint,
metrics: self.metrics().clone(),
parquet_file_reader_factory,
pushdown_filters: self.pushdown_filters(),
reorder_filters: self.reorder_filters(),
force_filter_selections: self.force_filter_selections(),
enable_page_index: self.enable_page_index(),
enable_bloom_filter: self.bloom_filter_on_read(),
enable_row_group_stats_pruning: self.table_parquet_options.global.pruning,
coerce_int96,
#[cfg(feature = "parquet_encryption")]
file_decryption_properties,
expr_adapter_factory,
#[cfg(feature = "parquet_encryption")]
encryption_factory: self.get_encryption_factory_with_config(),
max_predicate_cache_size: self.max_predicate_cache_size(),
reverse_row_groups: self.reverse_row_groups,
},
});
Ok(opener)
Ok(Box::new(ParquetMorselizer {
partition_index: partition,
projection: self.projection.clone(),
batch_size: self
.batch_size
.expect("Batch size must set before creating ParquetMorselizer"),
limit: base_config.limit,
preserve_order: base_config.preserve_order,
predicate: self.predicate.clone(),
table_schema: self.table_schema.clone(),
metadata_size_hint: self.metadata_size_hint,
metrics: self.metrics().clone(),
parquet_file_reader_factory,
pushdown_filters: self.pushdown_filters(),
reorder_filters: self.reorder_filters(),
force_filter_selections: self.force_filter_selections(),
enable_page_index: self.enable_page_index(),
enable_bloom_filter: self.bloom_filter_on_read(),
enable_row_group_stats_pruning: self.table_parquet_options.global.pruning,
coerce_int96,
#[cfg(feature = "parquet_encryption")]
file_decryption_properties,
expr_adapter_factory,
#[cfg(feature = "parquet_encryption")]
encryption_factory: self.get_encryption_factory_with_config(),
max_predicate_cache_size: self.max_predicate_cache_size(),
reverse_row_groups: self.reverse_row_groups,
}))
}

fn as_any(&self) -> &dyn Any {
Expand Down
2 changes: 2 additions & 0 deletions datafusion/datasource/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ version.workspace = true
all-features = true

[features]
backtrace = ["datafusion-common/backtrace"]
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this while debugging why the tests failed on CI and not locally (it was when this feature flag was on the Error messages got mangled).

I added a crate level feature to enable the feature in datafusion-common so I could reproduce the error locally

compression = ["async-compression", "liblzma", "bzip2", "flate2", "zstd", "tokio-util"]
default = ["compression"]

Expand Down Expand Up @@ -72,6 +73,7 @@ zstd = { workspace = true, optional = true }

[dev-dependencies]
criterion = { workspace = true }
insta = { workspace = true }
tempfile = { workspace = true }

# Note: add additional linter rules in lib.rs.
Expand Down
23 changes: 22 additions & 1 deletion datafusion/datasource/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::sync::Arc;
use crate::file_groups::FileGroupPartitioner;
use crate::file_scan_config::FileScanConfig;
use crate::file_stream::FileOpener;
use crate::morsel::{FileOpenerMorselizer, Morselizer};
#[expect(deprecated)]
use crate::schema_adapter::SchemaAdapterFactory;
use datafusion_common::config::ConfigOptions;
Expand Down Expand Up @@ -63,13 +64,33 @@ pub fn as_file_source<T: FileSource + 'static>(source: T) -> Arc<dyn FileSource>
///
/// [`DataSource`]: crate::source::DataSource
pub trait FileSource: Send + Sync {
/// Creates a `dyn FileOpener` based on given parameters
/// Creates a `dyn FileOpener` based on given parameters.
///
/// Note: File sources with a native morsel implementation should return an
/// error from this method and implementing [`Self::create_morselizer`] instead.
fn create_file_opener(
&self,
object_store: Arc<dyn ObjectStore>,
base_config: &FileScanConfig,
partition: usize,
) -> Result<Arc<dyn FileOpener>>;

/// Creates a `dyn Morselizer` based on given parameters.
///
/// The default implementation preserves existing behavior by adapting the
/// legacy [`FileOpener`] API into a [`Morselizer`].
///
/// It is preferred to implement the [`Morselizer`] API directly by
/// implementing this method.
fn create_morselizer(
&self,
object_store: Arc<dyn ObjectStore>,
base_config: &FileScanConfig,
partition: usize,
) -> Result<Box<dyn Morselizer>> {
let opener = self.create_file_opener(object_store, base_config, partition)?;
Ok(Box::new(FileOpenerMorselizer::new(opener)))
}
Comment on lines +78 to +93
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚀

/// Any
fn as_any(&self) -> &dyn Any;

Expand Down
4 changes: 2 additions & 2 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -587,11 +587,11 @@ impl DataSource for FileScanConfig {

let source = self.file_source.with_batch_size(batch_size);

let opener = source.create_file_opener(object_store, self, partition)?;
let morselizer = source.create_morselizer(object_store, self, partition)?;

let stream = FileStreamBuilder::new(self)
.with_partition(partition)
.with_file_opener(opener)
.with_morselizer(morselizer)
.with_metrics(source.metrics())
.build()?;
Ok(Box::pin(cooperative(stream)))
Expand Down
40 changes: 28 additions & 12 deletions datafusion/datasource/src/file_stream/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
use std::sync::Arc;

use crate::file_scan_config::FileScanConfig;
use crate::file_stream::scan_state::ScanState;
use crate::morsel::{FileOpenerMorselizer, Morselizer};
use datafusion_common::{Result, internal_err};
use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};

Expand All @@ -28,7 +30,7 @@ use super::{FileOpener, FileStream, FileStreamState, OnError};
pub struct FileStreamBuilder<'a> {
config: &'a FileScanConfig,
partition: Option<usize>,
file_opener: Option<Arc<dyn FileOpener>>,
morselizer: Option<Box<dyn Morselizer>>,
metrics: Option<&'a ExecutionPlanMetricsSet>,
on_error: OnError,
}
Expand All @@ -39,7 +41,7 @@ impl<'a> FileStreamBuilder<'a> {
Self {
config,
partition: None,
file_opener: None,
morselizer: None,
metrics: None,
on_error: OnError::Fail,
}
Expand All @@ -52,8 +54,18 @@ impl<'a> FileStreamBuilder<'a> {
}

/// Configure the [`FileOpener`] used to open files.
///
/// This will overwrite any setting from [`Self::with_morselizer`]
pub fn with_file_opener(mut self, file_opener: Arc<dyn FileOpener>) -> Self {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I think it could make sense to keep FileOpener as a public API for building data sources (if we consider it simpler, for folks who don't care about perf), this method in particular seems like a mostly internal method (even if it is pub) on we might as well deprecate / remove.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is the way we could keep using FileOpener (as it is simpler)

I am not sure how we could still allow using FileOpener but not keep this method

self.file_opener = Some(file_opener);
self.morselizer = Some(Box::new(FileOpenerMorselizer::new(file_opener)));
self
}

/// Configure the [`Morselizer`] used to open files.
///
/// This will overwrite any setting from [`Self::with_file_opener`]
pub fn with_morselizer(mut self, morselizer: Box<dyn Morselizer>) -> Self {
self.morselizer = Some(morselizer);
self
}

Expand All @@ -74,16 +86,16 @@ impl<'a> FileStreamBuilder<'a> {
let Self {
config,
partition,
file_opener,
morselizer,
metrics,
on_error,
} = self;

let Some(partition) = partition else {
return internal_err!("FileStreamBuilder missing required partition");
};
let Some(file_opener) = file_opener else {
return internal_err!("FileStreamBuilder missing required file_opener");
let Some(morselizer) = morselizer else {
return internal_err!("FileStreamBuilder missing required morselizer");
};
let Some(metrics) = metrics else {
return internal_err!("FileStreamBuilder missing required metrics");
Expand All @@ -95,15 +107,19 @@ impl<'a> FileStreamBuilder<'a> {
);
};

let file_stream_metrics = FileStreamMetrics::new(metrics, partition);
let scan_state = Box::new(ScanState::new(
file_group.into_inner(),
config.limit,
morselizer,
on_error,
file_stream_metrics,
));

Ok(FileStream {
file_iter: file_group.into_inner().into_iter().collect(),
projected_schema,
remain: config.limit,
file_opener,
state: FileStreamState::Idle,
file_stream_metrics: FileStreamMetrics::new(metrics, partition),
state: FileStreamState::Scan { scan_state },
baseline_metrics: BaselineMetrics::new(metrics, partition),
on_error,
})
}
}
9 changes: 3 additions & 6 deletions datafusion/datasource/src/file_stream/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ pub struct FileStreamMetrics {
/// Wall clock time elapsed for data decompression + decoding
///
/// Time spent waiting for the FileStream's input.
pub time_processing: StartableTime,
pub time_processing: Time,
/// Count of errors opening file.
///
/// If using `OnError::Skip` this will provide a count of the number of files
Expand Down Expand Up @@ -126,11 +126,8 @@ impl FileStreamMetrics {
start: None,
};

let time_processing = StartableTime {
metrics: MetricBuilder::new(metrics)
.subset_time("time_elapsed_processing", partition),
start: None,
};
let time_processing =
MetricBuilder::new(metrics).subset_time("time_elapsed_processing", partition);

let file_open_errors = MetricBuilder::new(metrics)
.with_category(MetricCategory::Rows)
Expand Down
Loading
Loading