Skip to content

Commit 8cd86f8

Browse files
committed
Rewrite FileStream in terms of Morselize API
1 parent 249c23c commit 8cd86f8

File tree

13 files changed

+1562
-342
lines changed

13 files changed

+1562
-342
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/datasource-parquet/src/opener.rs

Lines changed: 96 additions & 161 deletions
Large diffs are not rendered by default.

datafusion/datasource-parquet/src/source.rs

Lines changed: 46 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,15 @@ use std::sync::Arc;
2323

2424
use crate::DefaultParquetFileReaderFactory;
2525
use crate::ParquetFileReaderFactory;
26+
use crate::opener::ParquetMorselizer;
2627
use crate::opener::build_pruning_predicates;
27-
use crate::opener::{ParquetMorselizer, ParquetOpener};
2828
use crate::row_filter::can_expr_be_pushed_down_with_schemas;
2929
use datafusion_common::config::ConfigOptions;
3030
#[cfg(feature = "parquet_encryption")]
3131
use datafusion_common::config::EncryptionFactoryOptions;
3232
use datafusion_datasource::as_file_source;
3333
use datafusion_datasource::file_stream::FileOpener;
34+
use datafusion_datasource::morsel::Morselizer;
3435

3536
use arrow::datatypes::TimeUnit;
3637
use datafusion_common::DataFusionError;
@@ -246,12 +247,12 @@ use parquet::encryption::decrypt::FileDecryptionProperties;
246247
/// # Execution Overview
247248
///
248249
/// * Step 1: `DataSourceExec::execute` is called, returning a `FileStream`
249-
/// configured to open parquet files with a `ParquetOpener`.
250+
/// configured to morselize parquet files with a `ParquetMorselizer`.
250251
///
251-
/// * Step 2: When the stream is polled, the `ParquetOpener` is called to open
252-
/// the file.
252+
/// * Step 2: When the stream is polled, the `ParquetMorselizer` is called to
253+
/// plan the file.
253254
///
254-
/// * Step 3: The `ParquetOpener` gets the [`ParquetMetaData`] (file metadata)
255+
/// * Step 3: The `ParquetMorselizer` gets the [`ParquetMetaData`] (file metadata)
255256
/// via [`ParquetFileReaderFactory`], creating a `ParquetAccessPlan` by
256257
/// applying predicates to metadata. The plan and projections are used to
257258
/// determine what pages must be read.
@@ -511,11 +512,22 @@ impl From<ParquetSource> for Arc<dyn FileSource> {
511512

512513
impl FileSource for ParquetSource {
513514
fn create_file_opener(
515+
&self,
516+
_object_store: Arc<dyn ObjectStore>,
517+
_base_config: &FileScanConfig,
518+
_partition: usize,
519+
) -> datafusion_common::Result<Arc<dyn FileOpener>> {
520+
datafusion_common::internal_err!(
521+
"ParquetSource::create_file_opener called but it supports the Morsel API"
522+
)
523+
}
524+
525+
fn create_morselizer(
514526
&self,
515527
object_store: Arc<dyn ObjectStore>,
516528
base_config: &FileScanConfig,
517529
partition: usize,
518-
) -> datafusion_common::Result<Arc<dyn FileOpener>> {
530+
) -> datafusion_common::Result<Box<dyn Morselizer>> {
519531
let expr_adapter_factory = base_config
520532
.expr_adapter_factory
521533
.clone()
@@ -542,37 +554,34 @@ impl FileSource for ParquetSource {
542554
.as_ref()
543555
.map(|time_unit| parse_coerce_int96_string(time_unit.as_str()).unwrap());
544556

545-
let opener = Arc::new(ParquetOpener {
546-
morselizer: ParquetMorselizer {
547-
partition_index: partition,
548-
projection: self.projection.clone(),
549-
batch_size: self
550-
.batch_size
551-
.expect("Batch size must set before creating ParquetOpener"),
552-
limit: base_config.limit,
553-
preserve_order: base_config.preserve_order,
554-
predicate: self.predicate.clone(),
555-
table_schema: self.table_schema.clone(),
556-
metadata_size_hint: self.metadata_size_hint,
557-
metrics: self.metrics().clone(),
558-
parquet_file_reader_factory,
559-
pushdown_filters: self.pushdown_filters(),
560-
reorder_filters: self.reorder_filters(),
561-
force_filter_selections: self.force_filter_selections(),
562-
enable_page_index: self.enable_page_index(),
563-
enable_bloom_filter: self.bloom_filter_on_read(),
564-
enable_row_group_stats_pruning: self.table_parquet_options.global.pruning,
565-
coerce_int96,
566-
#[cfg(feature = "parquet_encryption")]
567-
file_decryption_properties,
568-
expr_adapter_factory,
569-
#[cfg(feature = "parquet_encryption")]
570-
encryption_factory: self.get_encryption_factory_with_config(),
571-
max_predicate_cache_size: self.max_predicate_cache_size(),
572-
reverse_row_groups: self.reverse_row_groups,
573-
},
574-
});
575-
Ok(opener)
557+
Ok(Box::new(ParquetMorselizer {
558+
partition_index: partition,
559+
projection: self.projection.clone(),
560+
batch_size: self
561+
.batch_size
562+
.expect("Batch size must set before creating ParquetMorselizer"),
563+
limit: base_config.limit,
564+
preserve_order: base_config.preserve_order,
565+
predicate: self.predicate.clone(),
566+
table_schema: self.table_schema.clone(),
567+
metadata_size_hint: self.metadata_size_hint,
568+
metrics: self.metrics().clone(),
569+
parquet_file_reader_factory,
570+
pushdown_filters: self.pushdown_filters(),
571+
reorder_filters: self.reorder_filters(),
572+
force_filter_selections: self.force_filter_selections(),
573+
enable_page_index: self.enable_page_index(),
574+
enable_bloom_filter: self.bloom_filter_on_read(),
575+
enable_row_group_stats_pruning: self.table_parquet_options.global.pruning,
576+
coerce_int96,
577+
#[cfg(feature = "parquet_encryption")]
578+
file_decryption_properties,
579+
expr_adapter_factory,
580+
#[cfg(feature = "parquet_encryption")]
581+
encryption_factory: self.get_encryption_factory_with_config(),
582+
max_predicate_cache_size: self.max_predicate_cache_size(),
583+
reverse_row_groups: self.reverse_row_groups,
584+
}))
576585
}
577586

578587
fn as_any(&self) -> &dyn Any {

datafusion/datasource/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ version.workspace = true
3131
all-features = true
3232

3333
[features]
34+
backtrace = ["datafusion-common/backtrace"]
3435
compression = ["async-compression", "liblzma", "bzip2", "flate2", "zstd", "tokio-util"]
3536
default = ["compression"]
3637

@@ -72,6 +73,7 @@ zstd = { workspace = true, optional = true }
7273

7374
[dev-dependencies]
7475
criterion = { workspace = true }
76+
insta = { workspace = true }
7577
tempfile = { workspace = true }
7678

7779
# Note: add additional linter rules in lib.rs.

datafusion/datasource/src/file.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use std::sync::Arc;
2525
use crate::file_groups::FileGroupPartitioner;
2626
use crate::file_scan_config::FileScanConfig;
2727
use crate::file_stream::FileOpener;
28+
use crate::morsel::{FileOpenerMorselizer, Morselizer};
2829
#[expect(deprecated)]
2930
use crate::schema_adapter::SchemaAdapterFactory;
3031
use datafusion_common::config::ConfigOptions;
@@ -63,13 +64,33 @@ pub fn as_file_source<T: FileSource + 'static>(source: T) -> Arc<dyn FileSource>
6364
///
6465
/// [`DataSource`]: crate::source::DataSource
6566
pub trait FileSource: Send + Sync {
66-
/// Creates a `dyn FileOpener` based on given parameters
67+
/// Creates a `dyn FileOpener` based on given parameters.
68+
///
69+
/// Note: File sources with a native morsel implementation should return an
70+
/// error from this method and implementing [`Self::create_morselizer`] instead.
6771
fn create_file_opener(
6872
&self,
6973
object_store: Arc<dyn ObjectStore>,
7074
base_config: &FileScanConfig,
7175
partition: usize,
7276
) -> Result<Arc<dyn FileOpener>>;
77+
78+
/// Creates a `dyn Morselizer` based on given parameters.
79+
///
80+
/// The default implementation preserves existing behavior by adapting the
81+
/// legacy [`FileOpener`] API into a [`Morselizer`].
82+
///
83+
/// It is preferred to implement the [`Morselizer`] API directly by
84+
/// implementing this method.
85+
fn create_morselizer(
86+
&self,
87+
object_store: Arc<dyn ObjectStore>,
88+
base_config: &FileScanConfig,
89+
partition: usize,
90+
) -> Result<Box<dyn Morselizer>> {
91+
let opener = self.create_file_opener(object_store, base_config, partition)?;
92+
Ok(Box::new(FileOpenerMorselizer::new(opener)))
93+
}
7394
/// Any
7495
fn as_any(&self) -> &dyn Any;
7596

datafusion/datasource/src/file_scan_config.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -587,11 +587,11 @@ impl DataSource for FileScanConfig {
587587

588588
let source = self.file_source.with_batch_size(batch_size);
589589

590-
let opener = source.create_file_opener(object_store, self, partition)?;
590+
let morselizer = source.create_morselizer(object_store, self, partition)?;
591591

592592
let stream = FileStreamBuilder::new(self)
593593
.with_partition(partition)
594-
.with_file_opener(opener)
594+
.with_morselizer(morselizer)
595595
.with_metrics(source.metrics())
596596
.build()?;
597597
Ok(Box::pin(cooperative(stream)))

datafusion/datasource/src/file_stream/builder.rs

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
use std::sync::Arc;
1919

2020
use crate::file_scan_config::FileScanConfig;
21+
use crate::file_stream::scan_state::ScanState;
22+
use crate::morsel::{FileOpenerMorselizer, Morselizer};
2123
use datafusion_common::{Result, internal_err};
2224
use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
2325

@@ -28,7 +30,7 @@ use super::{FileOpener, FileStream, FileStreamState, OnError};
2830
pub struct FileStreamBuilder<'a> {
2931
config: &'a FileScanConfig,
3032
partition: Option<usize>,
31-
file_opener: Option<Arc<dyn FileOpener>>,
33+
morselizer: Option<Box<dyn Morselizer>>,
3234
metrics: Option<&'a ExecutionPlanMetricsSet>,
3335
on_error: OnError,
3436
}
@@ -39,7 +41,7 @@ impl<'a> FileStreamBuilder<'a> {
3941
Self {
4042
config,
4143
partition: None,
42-
file_opener: None,
44+
morselizer: None,
4345
metrics: None,
4446
on_error: OnError::Fail,
4547
}
@@ -52,8 +54,18 @@ impl<'a> FileStreamBuilder<'a> {
5254
}
5355

5456
/// Configure the [`FileOpener`] used to open files.
57+
///
58+
/// This will overwrite any setting from [`Self::with_morselizer`]
5559
pub fn with_file_opener(mut self, file_opener: Arc<dyn FileOpener>) -> Self {
56-
self.file_opener = Some(file_opener);
60+
self.morselizer = Some(Box::new(FileOpenerMorselizer::new(file_opener)));
61+
self
62+
}
63+
64+
/// Configure the [`Morselizer`] used to open files.
65+
///
66+
/// This will overwrite any setting from [`Self::with_file_opener`]
67+
pub fn with_morselizer(mut self, morselizer: Box<dyn Morselizer>) -> Self {
68+
self.morselizer = Some(morselizer);
5769
self
5870
}
5971

@@ -74,16 +86,16 @@ impl<'a> FileStreamBuilder<'a> {
7486
let Self {
7587
config,
7688
partition,
77-
file_opener,
89+
morselizer,
7890
metrics,
7991
on_error,
8092
} = self;
8193

8294
let Some(partition) = partition else {
8395
return internal_err!("FileStreamBuilder missing required partition");
8496
};
85-
let Some(file_opener) = file_opener else {
86-
return internal_err!("FileStreamBuilder missing required file_opener");
97+
let Some(morselizer) = morselizer else {
98+
return internal_err!("FileStreamBuilder missing required morselizer");
8799
};
88100
let Some(metrics) = metrics else {
89101
return internal_err!("FileStreamBuilder missing required metrics");
@@ -95,15 +107,19 @@ impl<'a> FileStreamBuilder<'a> {
95107
);
96108
};
97109

110+
let file_stream_metrics = FileStreamMetrics::new(metrics, partition);
111+
let scan_state = Box::new(ScanState::new(
112+
file_group.into_inner(),
113+
config.limit,
114+
morselizer,
115+
on_error,
116+
file_stream_metrics,
117+
));
118+
98119
Ok(FileStream {
99-
file_iter: file_group.into_inner().into_iter().collect(),
100120
projected_schema,
101-
remain: config.limit,
102-
file_opener,
103-
state: FileStreamState::Idle,
104-
file_stream_metrics: FileStreamMetrics::new(metrics, partition),
121+
state: FileStreamState::Scan { scan_state },
105122
baseline_metrics: BaselineMetrics::new(metrics, partition),
106-
on_error,
107123
})
108124
}
109125
}

datafusion/datasource/src/file_stream/metrics.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ pub struct FileStreamMetrics {
7777
/// Wall clock time elapsed for data decompression + decoding
7878
///
7979
/// Time spent waiting for the FileStream's input.
80-
pub time_processing: StartableTime,
80+
pub time_processing: Time,
8181
/// Count of errors opening file.
8282
///
8383
/// If using `OnError::Skip` this will provide a count of the number of files
@@ -126,11 +126,8 @@ impl FileStreamMetrics {
126126
start: None,
127127
};
128128

129-
let time_processing = StartableTime {
130-
metrics: MetricBuilder::new(metrics)
131-
.subset_time("time_elapsed_processing", partition),
132-
start: None,
133-
};
129+
let time_processing =
130+
MetricBuilder::new(metrics).subset_time("time_elapsed_processing", partition);
134131

135132
let file_open_errors = MetricBuilder::new(metrics)
136133
.with_category(MetricCategory::Rows)

0 commit comments

Comments
 (0)