diff --git a/Cargo.lock b/Cargo.lock index 87c18826096c..3f7ab2ebaa7d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1963,6 +1963,7 @@ dependencies = [ "flate2", "futures", "glob", + "insta", "itertools 0.14.0", "liblzma", "log", diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 35900e16c18e..21a383c6dabb 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! [`ParquetOpener`] and [`ParquetMorselizer`] state machines for opening Parquet files +//! [`ParquetMorselizer`] state machines for opening Parquet files use crate::page_filter::PagePruningAccessPlanFilter; use crate::row_filter::build_projection_read_plan; @@ -26,15 +26,11 @@ use crate::{ }; use arrow::array::{RecordBatch, RecordBatchOptions}; use arrow::datatypes::DataType; -use datafusion_common::internal_err; -use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; -use datafusion_datasource::morsel::{ - Morsel, MorselPlan, MorselPlanner, Morselizer, PendingMorselPlanner, -}; +use datafusion_datasource::morsel::{Morsel, MorselPlan, MorselPlanner, Morselizer}; use datafusion_physical_expr::projection::{ProjectionExprs, Projector}; use datafusion_physical_expr::utils::reassign_expr_columns; use datafusion_physical_expr_adapter::replace_columns_with_literals; -use std::collections::{HashMap, VecDeque}; +use std::collections::HashMap; use std::fmt; use std::future::Future; use std::mem; @@ -82,19 +78,6 @@ use parquet::bloom_filter::Sbbf; use parquet::errors::ParquetError; use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader}; -/// Implements [`FileOpener`] for Parquet -#[derive(Clone)] -pub(super) struct ParquetOpener { - pub(super) morselizer: ParquetMorselizer, -} - -impl FileOpener for ParquetOpener { - fn open(&self, partitioned_file: PartitionedFile) -> Result { - let future = ParquetOpenFuture::new(&self.morselizer, partitioned_file)?; - Ok(Box::pin(future)) - } -} - /// Stateless Parquet morselizer implementation. /// /// Reading a Parquet file is a multi-stage process, with multiple CPU-intensive @@ -173,7 +156,7 @@ impl Morselizer for ParquetMorselizer { } } -/// States for [`ParquetOpenFuture`] +/// States for [`ParquetMorselPlanner`] /// /// These states correspond to the steps required to read and apply various /// filter operations. @@ -425,85 +408,6 @@ impl ParquetOpenState { } } -/// Adapter for a [`MorselPlanner`] to the [`FileOpener`] API -/// -/// Compatibility adapter that drives a morsel planner through the -/// [`FileOpener`] API. -struct ParquetOpenFuture { - planner: Option>, - pending_io: Option, - ready_morsels: VecDeque>, -} - -impl ParquetOpenFuture { - fn new( - morselizer: &ParquetMorselizer, - partitioned_file: PartitionedFile, - ) -> Result { - Ok(Self { - planner: Some(morselizer.plan_file(partitioned_file)?), - pending_io: None, - ready_morsels: VecDeque::new(), - }) - } -} - -impl Future for ParquetOpenFuture { - type Output = Result>>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - loop { - // If planner I/O completed, resume with the returned planner. - if let Some(io_future) = self.pending_io.as_mut() { - let maybe_planner = ready!(io_future.poll_unpin(cx)); - // Clear `pending_io` before handling the result so an error - // cannot leave both continuation paths populated. - self.pending_io = None; - if self.planner.is_some() { - return Poll::Ready(internal_err!( - "ParquetOpenFuture does not support concurrent planners" - )); - } - self.planner = Some(maybe_planner?); - } - - // If a stream morsel is ready, return it. - if let Some(morsel) = self.ready_morsels.pop_front() { - return Poll::Ready(Ok(morsel.into_stream())); - } - - // This shim must always own either a planner, a pending planner - // future, or a ready morsel. Reaching this branch means the - // continuation was lost. - let Some(planner) = self.planner.take() else { - return Poll::Ready(internal_err!( - "ParquetOpenFuture polled after completion" - )); - }; - - // Planner completed without producing a stream morsel. - // (e.g. all row groups were pruned) - let Some(mut plan) = planner.plan()? else { - return Poll::Ready(Ok(futures::stream::empty().boxed())); - }; - - let mut child_planners = plan.take_ready_planners(); - if child_planners.len() > 1 { - return Poll::Ready(internal_err!( - "Parquet FileOpener adapter does not support child morsel planners" - )); - } - self.planner = child_planners.pop(); - - self.ready_morsels = plan.take_morsels().into(); - - if let Some(io_future) = plan.take_pending_planner() { - self.pending_io = Some(io_future); - } - } - } -} - /// Implements the Morsel API struct ParquetStreamMorsel { stream: BoxStream<'static, Result>, @@ -1722,17 +1626,18 @@ fn should_enable_page_index( #[cfg(test)] mod test { - use std::sync::Arc; - + use super::*; use super::{ConstantColumns, ParquetMorselizer, constant_columns_from_stats}; - use crate::{DefaultParquetFileReaderFactory, RowGroupAccess, opener::ParquetOpener}; + use crate::{DefaultParquetFileReaderFactory, RowGroupAccess}; + use arrow::array::RecordBatch; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use bytes::{BufMut, BytesMut}; use datafusion_common::{ - ColumnStatistics, DataFusionError, ScalarValue, Statistics, record_batch, + ColumnStatistics, ScalarValue, Statistics, internal_err, record_batch, stats::Precision, }; - use datafusion_datasource::{PartitionedFile, TableSchema, file_stream::FileOpener}; + use datafusion_datasource::morsel::{Morsel, Morselizer}; + use datafusion_datasource::{PartitionedFile, TableSchema}; use datafusion_expr::{col, lit}; use datafusion_physical_expr::{ PhysicalExpr, @@ -1744,14 +1649,17 @@ mod test { DefaultPhysicalExprAdapterFactory, replace_columns_with_literals, }; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; - use futures::{Stream, StreamExt}; + use futures::StreamExt; + use futures::stream::BoxStream; use object_store::{ObjectStore, ObjectStoreExt, memory::InMemory, path::Path}; use parquet::arrow::ArrowWriter; use parquet::file::properties::WriterProperties; + use std::collections::VecDeque; + use std::sync::Arc; - /// Builder for creating [`ParquetOpener`] instances with sensible defaults for tests. + /// Builder for creating [`ParquetMorselizer`] instances with sensible defaults for tests. /// This helps reduce code duplication and makes it clear what differs between test cases. - struct ParquetOpenerBuilder { + struct ParquetMorselizerBuilder { store: Option>, table_schema: Option, partition_index: usize, @@ -1768,13 +1676,13 @@ mod test { enable_page_index: bool, enable_bloom_filter: bool, enable_row_group_stats_pruning: bool, - coerce_int96: Option, + coerce_int96: Option, max_predicate_cache_size: Option, reverse_row_groups: bool, preserve_order: bool, } - impl ParquetOpenerBuilder { + impl ParquetMorselizerBuilder { /// Create a new builder with sensible defaults for tests. fn new() -> Self { Self { @@ -1855,17 +1763,17 @@ mod test { self } - /// Build the ParquetOpener instance. + /// Build the ParquetMorselizer instance. /// /// # Panics /// /// Panics if required fields (store, schema/table_schema) are not set. - fn build(self) -> ParquetOpener { + fn build(self) -> ParquetMorselizer { let store = self .store - .expect("ParquetOpenerBuilder: store must be set via with_store()"); + .expect("ParquetMorselizerBuilder: store must be set via with_store()"); let table_schema = self.table_schema.expect( - "ParquetOpenerBuilder: table_schema must be set via with_schema() or with_table_schema()", + "ParquetMorselizerBuilder: table_schema must be set via with_schema() or with_table_schema()", ); let file_schema = Arc::clone(table_schema.file_schema()); @@ -1879,7 +1787,7 @@ mod test { ProjectionExprs::from_indices(&all_indices, &file_schema) }; - let morselizer = ParquetMorselizer { + ParquetMorselizer { partition_index: self.partition_index, projection, batch_size: self.batch_size, @@ -1906,8 +1814,45 @@ mod test { encryption_factory: None, max_predicate_cache_size: self.max_predicate_cache_size, reverse_row_groups: self.reverse_row_groups, + } + } + } + + /// Test helper that drives a [`ParquetMorselizer`] to completion and returns + /// the first stream morsel it produces. + /// + /// This mirrors how `FileStream` consumes the morsel APIs: it repeatedly + /// plans CPU work, awaits any discovered I/O futures, and feeds the planner + /// back into the ready queue until a stream morsel is ready. + async fn open_file( + morselizer: &ParquetMorselizer, + file: PartitionedFile, + ) -> Result>> { + let mut planners = VecDeque::from([morselizer.plan_file(file)?]); + let mut morsels: VecDeque> = VecDeque::new(); + + loop { + if let Some(morsel) = morsels.pop_front() { + return Ok(Box::pin(morsel.into_stream())); + } + + let Some(planner) = planners.pop_front() else { + return Ok(Box::pin(futures::stream::empty())); }; - ParquetOpener { morselizer } + + if let Some(mut plan) = planner.plan()? { + morsels.extend(plan.take_morsels()); + planners.extend(plan.take_ready_planners()); + + if let Some(pending_planner) = plan.take_pending_planner() { + planners.push_front(pending_planner.await?); + continue; + } + + if morsels.is_empty() && planners.is_empty() { + return internal_err!("planner returned an empty morsel plan"); + } + } } } @@ -1995,12 +1940,7 @@ mod test { } async fn count_batches_and_rows( - mut stream: std::pin::Pin< - Box< - dyn Stream> - + Send, - >, - >, + mut stream: BoxStream<'static, Result>, ) -> (usize, usize) { let mut num_batches = 0; let mut num_rows = 0; @@ -2013,12 +1953,7 @@ mod test { /// Helper to collect all int32 values from the first column of batches async fn collect_int32_values( - mut stream: std::pin::Pin< - Box< - dyn Stream> - + Send, - >, - >, + mut stream: BoxStream<'static, Result>, ) -> Vec { use arrow::array::Array; let mut values = vec![]; @@ -2104,7 +2039,7 @@ mod test { )); let make_opener = |predicate| { - ParquetOpenerBuilder::new() + ParquetMorselizerBuilder::new() .with_store(Arc::clone(&store)) .with_schema(Arc::clone(&schema)) .with_projection_indices(&[0, 1]) @@ -2117,7 +2052,7 @@ mod test { let expr = col("a").eq(lit(1)); let predicate = logical2physical(&expr, &schema); let opener = make_opener(predicate); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_file(&opener, file.clone()).await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 1); assert_eq!(num_rows, 3); @@ -2126,7 +2061,7 @@ mod test { let expr = col("b").eq(lit(ScalarValue::Float32(Some(5.0)))); let predicate = logical2physical(&expr, &schema); let opener = make_opener(predicate); - let stream = opener.open(file).unwrap().await.unwrap(); + let stream = open_file(&opener, file).await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -2157,7 +2092,7 @@ mod test { vec![Arc::new(Field::new("part", DataType::Int32, false))], ); let make_opener = |predicate| { - ParquetOpenerBuilder::new() + ParquetMorselizerBuilder::new() .with_store(Arc::clone(&store)) .with_table_schema(table_schema_for_opener.clone()) .with_projection_indices(&[0]) @@ -2172,7 +2107,7 @@ mod test { // Otherwise we assume it already happened at the planning stage and won't re-do the work here let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema)); let opener = make_opener(predicate); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_file(&opener, file.clone()).await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 1); assert_eq!(num_rows, 3); @@ -2183,7 +2118,7 @@ mod test { // Otherwise we assume it already happened at the planning stage and won't re-do the work here let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema)); let opener = make_opener(predicate); - let stream = opener.open(file).unwrap().await.unwrap(); + let stream = open_file(&opener, file).await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -2226,7 +2161,7 @@ mod test { vec![Arc::new(Field::new("part", DataType::Int32, false))], ); let make_opener = |predicate| { - ParquetOpenerBuilder::new() + ParquetMorselizerBuilder::new() .with_store(Arc::clone(&store)) .with_table_schema(table_schema_for_opener.clone()) .with_projection_indices(&[0]) @@ -2239,7 +2174,7 @@ mod test { let expr = col("part").eq(lit(1)).and(col("b").eq(lit(1.0))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_file(&opener, file.clone()).await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 1); assert_eq!(num_rows, 3); @@ -2248,7 +2183,7 @@ mod test { let expr = col("part").eq(lit(2)).and(col("b").eq(lit(1.0))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_file(&opener, file.clone()).await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -2257,7 +2192,7 @@ mod test { let expr = col("part").eq(lit(1)).and(col("b").eq(lit(7.0))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_file(&opener, file.clone()).await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -2266,7 +2201,7 @@ mod test { let expr = col("part").eq(lit(2)).and(col("b").eq(lit(7.0))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener.open(file).unwrap().await.unwrap(); + let stream = open_file(&opener, file).await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -2298,7 +2233,7 @@ mod test { vec![Arc::new(Field::new("part", DataType::Int32, false))], ); let make_opener = |predicate| { - ParquetOpenerBuilder::new() + ParquetMorselizerBuilder::new() .with_store(Arc::clone(&store)) .with_table_schema(table_schema_for_opener.clone()) .with_projection_indices(&[0]) @@ -2312,7 +2247,7 @@ mod test { let expr = col("part").eq(lit(1)).or(col("a").eq(lit(1))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_file(&opener, file.clone()).await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 1); assert_eq!(num_rows, 3); @@ -2321,7 +2256,7 @@ mod test { let expr = col("part").eq(lit(1)).or(col("a").eq(lit(3))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_file(&opener, file.clone()).await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 1); assert_eq!(num_rows, 3); @@ -2330,7 +2265,7 @@ mod test { let expr = col("part").eq(lit(2)).or(col("a").eq(lit(1))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_file(&opener, file.clone()).await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 1); assert_eq!(num_rows, 1); @@ -2339,7 +2274,7 @@ mod test { let expr = col("part").eq(lit(2)).or(col("a").eq(lit(3))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener.open(file).unwrap().await.unwrap(); + let stream = open_file(&opener, file).await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -2379,7 +2314,7 @@ mod test { vec![Arc::new(Field::new("part", DataType::Int32, false))], ); let make_opener = |predicate| { - ParquetOpenerBuilder::new() + ParquetMorselizerBuilder::new() .with_store(Arc::clone(&store)) .with_table_schema(table_schema_for_opener.clone()) .with_projection_indices(&[0]) @@ -2392,7 +2327,7 @@ mod test { let expr = col("a").eq(lit(42)); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_file(&opener, file.clone()).await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -2401,7 +2336,7 @@ mod test { // This allows dynamic filters to prune partitions/files even if they are populated late into execution. let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema)); let opener = make_opener(predicate); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_file(&opener, file.clone()).await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -2411,7 +2346,7 @@ mod test { let expr = col("part").eq(lit(2)); let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema)); let opener = make_opener(predicate); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_file(&opener, file.clone()).await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -2420,7 +2355,7 @@ mod test { let expr = col("part").eq(lit(2)).and(col("a").eq(lit(42))); let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema)); let opener = make_opener(predicate); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_file(&opener, file.clone()).await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -2461,7 +2396,7 @@ mod test { ); let make_opener = |reverse_scan: bool| { - ParquetOpenerBuilder::new() + ParquetMorselizerBuilder::new() .with_store(Arc::clone(&store)) .with_schema(Arc::clone(&schema)) .with_projection_indices(&[0]) @@ -2471,12 +2406,12 @@ mod test { // Test normal scan (forward) let opener = make_opener(false); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_file(&opener, file.clone()).await.unwrap(); let forward_values = collect_int32_values(stream).await; // Test reverse scan let opener = make_opener(true); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_file(&opener, file.clone()).await.unwrap(); let reverse_values = collect_int32_values(stream).await; // The forward scan should return data in the order written @@ -2503,7 +2438,7 @@ mod test { ); let make_opener = |reverse_scan: bool| { - ParquetOpenerBuilder::new() + ParquetMorselizerBuilder::new() .with_store(Arc::clone(&store)) .with_schema(Arc::clone(&schema)) .with_projection_indices(&[0]) @@ -2514,11 +2449,11 @@ mod test { // With a single row group, forward and reverse should be the same // (only the row group order is reversed, not the rows within) let opener_forward = make_opener(false); - let stream_forward = opener_forward.open(file.clone()).unwrap().await.unwrap(); + let stream_forward = open_file(&opener_forward, file.clone()).await.unwrap(); let (batches_forward, _) = count_batches_and_rows(stream_forward).await; let opener_reverse = make_opener(true); - let stream_reverse = opener_reverse.open(file).unwrap().await.unwrap(); + let stream_reverse = open_file(&opener_reverse, file).await.unwrap(); let (batches_reverse, _) = count_batches_and_rows(stream_reverse).await; // Both should have the same number of batches since there's only one row group @@ -2579,7 +2514,7 @@ mod test { .with_extensions(Arc::new(access_plan)); let make_opener = |reverse_scan: bool| { - ParquetOpenerBuilder::new() + ParquetMorselizerBuilder::new() .with_store(Arc::clone(&store)) .with_schema(Arc::clone(&schema)) .with_projection_indices(&[0]) @@ -2589,7 +2524,7 @@ mod test { // Forward scan: RG0(3,4), RG1(5,6,7,8), RG2(9,10) let opener = make_opener(false); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_file(&opener, file.clone()).await.unwrap(); let forward_values = collect_int32_values(stream).await; // Forward scan should produce: RG0(3,4), RG1(5,6,7,8), RG2(9,10) @@ -2605,7 +2540,7 @@ mod test { // - RG1 is read second, WITH RG1's selection (select all) -> 5, 6, 7, 8 // - RG0 is read third, WITH RG0's selection (skip 2, select 2) -> 3, 4 let opener = make_opener(true); - let stream = opener.open(file).unwrap().await.unwrap(); + let stream = open_file(&opener, file).await.unwrap(); let reverse_values = collect_int32_values(stream).await; // Correct expected result: row groups reversed but each keeps its own selection @@ -2680,7 +2615,7 @@ mod test { .with_extensions(Arc::new(access_plan)); let make_opener = |reverse_scan: bool| { - ParquetOpenerBuilder::new() + ParquetMorselizerBuilder::new() .with_store(Arc::clone(&store)) .with_schema(Arc::clone(&schema)) .with_projection_indices(&[0]) @@ -2691,7 +2626,7 @@ mod test { // Forward scan: RG0(1), RG2(5), RG3(7) // Note: RG1 is completely skipped let opener = make_opener(false); - let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let stream = open_file(&opener, file.clone()).await.unwrap(); let forward_values = collect_int32_values(stream).await; assert_eq!( @@ -2704,7 +2639,7 @@ mod test { // WITHOUT the bug fix, this would return WRONG values // because the RowSelection would be incorrectly mapped let opener = make_opener(true); - let stream = opener.open(file).unwrap().await.unwrap(); + let stream = open_file(&opener, file).await.unwrap(); let reverse_values = collect_int32_values(stream).await; assert_eq!( diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 1e54e98dfd04..d4dc4f1400a9 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -23,14 +23,15 @@ use std::sync::Arc; use crate::DefaultParquetFileReaderFactory; use crate::ParquetFileReaderFactory; +use crate::opener::ParquetMorselizer; use crate::opener::build_pruning_predicates; -use crate::opener::{ParquetMorselizer, ParquetOpener}; use crate::row_filter::can_expr_be_pushed_down_with_schemas; use datafusion_common::config::ConfigOptions; #[cfg(feature = "parquet_encryption")] use datafusion_common::config::EncryptionFactoryOptions; use datafusion_datasource::as_file_source; use datafusion_datasource::file_stream::FileOpener; +use datafusion_datasource::morsel::Morselizer; use arrow::datatypes::TimeUnit; use datafusion_common::DataFusionError; @@ -246,12 +247,12 @@ use parquet::encryption::decrypt::FileDecryptionProperties; /// # Execution Overview /// /// * Step 1: `DataSourceExec::execute` is called, returning a `FileStream` -/// configured to open parquet files with a `ParquetOpener`. +/// configured to morselize parquet files with a `ParquetMorselizer`. /// -/// * Step 2: When the stream is polled, the `ParquetOpener` is called to open -/// the file. +/// * Step 2: When the stream is polled, the `ParquetMorselizer` is called to +/// plan the file. /// -/// * Step 3: The `ParquetOpener` gets the [`ParquetMetaData`] (file metadata) +/// * Step 3: The `ParquetMorselizer` gets the [`ParquetMetaData`] (file metadata) /// via [`ParquetFileReaderFactory`], creating a `ParquetAccessPlan` by /// applying predicates to metadata. The plan and projections are used to /// determine what pages must be read. @@ -511,11 +512,22 @@ impl From for Arc { impl FileSource for ParquetSource { fn create_file_opener( + &self, + _object_store: Arc, + _base_config: &FileScanConfig, + _partition: usize, + ) -> datafusion_common::Result> { + datafusion_common::internal_err!( + "ParquetSource::create_file_opener called but it supports the Morsel API, please use that instead" + ) + } + + fn create_morselizer( &self, object_store: Arc, base_config: &FileScanConfig, partition: usize, - ) -> datafusion_common::Result> { + ) -> datafusion_common::Result> { let expr_adapter_factory = base_config .expr_adapter_factory .clone() @@ -542,37 +554,34 @@ impl FileSource for ParquetSource { .as_ref() .map(|time_unit| parse_coerce_int96_string(time_unit.as_str()).unwrap()); - let opener = Arc::new(ParquetOpener { - morselizer: ParquetMorselizer { - partition_index: partition, - projection: self.projection.clone(), - batch_size: self - .batch_size - .expect("Batch size must set before creating ParquetOpener"), - limit: base_config.limit, - preserve_order: base_config.preserve_order, - predicate: self.predicate.clone(), - table_schema: self.table_schema.clone(), - metadata_size_hint: self.metadata_size_hint, - metrics: self.metrics().clone(), - parquet_file_reader_factory, - pushdown_filters: self.pushdown_filters(), - reorder_filters: self.reorder_filters(), - force_filter_selections: self.force_filter_selections(), - enable_page_index: self.enable_page_index(), - enable_bloom_filter: self.bloom_filter_on_read(), - enable_row_group_stats_pruning: self.table_parquet_options.global.pruning, - coerce_int96, - #[cfg(feature = "parquet_encryption")] - file_decryption_properties, - expr_adapter_factory, - #[cfg(feature = "parquet_encryption")] - encryption_factory: self.get_encryption_factory_with_config(), - max_predicate_cache_size: self.max_predicate_cache_size(), - reverse_row_groups: self.reverse_row_groups, - }, - }); - Ok(opener) + Ok(Box::new(ParquetMorselizer { + partition_index: partition, + projection: self.projection.clone(), + batch_size: self + .batch_size + .expect("Batch size must set before creating ParquetMorselizer"), + limit: base_config.limit, + preserve_order: base_config.preserve_order, + predicate: self.predicate.clone(), + table_schema: self.table_schema.clone(), + metadata_size_hint: self.metadata_size_hint, + metrics: self.metrics().clone(), + parquet_file_reader_factory, + pushdown_filters: self.pushdown_filters(), + reorder_filters: self.reorder_filters(), + force_filter_selections: self.force_filter_selections(), + enable_page_index: self.enable_page_index(), + enable_bloom_filter: self.bloom_filter_on_read(), + enable_row_group_stats_pruning: self.table_parquet_options.global.pruning, + coerce_int96, + #[cfg(feature = "parquet_encryption")] + file_decryption_properties, + expr_adapter_factory, + #[cfg(feature = "parquet_encryption")] + encryption_factory: self.get_encryption_factory_with_config(), + max_predicate_cache_size: self.max_predicate_cache_size(), + reverse_row_groups: self.reverse_row_groups, + })) } fn as_any(&self) -> &dyn Any { diff --git a/datafusion/datasource/Cargo.toml b/datafusion/datasource/Cargo.toml index 1315f871a68f..402752165897 100644 --- a/datafusion/datasource/Cargo.toml +++ b/datafusion/datasource/Cargo.toml @@ -31,6 +31,7 @@ version.workspace = true all-features = true [features] +backtrace = ["datafusion-common/backtrace"] compression = ["async-compression", "liblzma", "bzip2", "flate2", "zstd", "tokio-util"] default = ["compression"] @@ -72,6 +73,7 @@ zstd = { workspace = true, optional = true } [dev-dependencies] criterion = { workspace = true } +insta = { workspace = true } tempfile = { workspace = true } # Note: add additional linter rules in lib.rs. diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index b5a6760cae02..9c43eeca2bc9 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -25,6 +25,7 @@ use std::sync::Arc; use crate::file_groups::FileGroupPartitioner; use crate::file_scan_config::FileScanConfig; use crate::file_stream::FileOpener; +use crate::morsel::{FileOpenerMorselizer, Morselizer}; #[expect(deprecated)] use crate::schema_adapter::SchemaAdapterFactory; use datafusion_common::config::ConfigOptions; @@ -63,13 +64,33 @@ pub fn as_file_source(source: T) -> Arc /// /// [`DataSource`]: crate::source::DataSource pub trait FileSource: Send + Sync { - /// Creates a `dyn FileOpener` based on given parameters + /// Creates a `dyn FileOpener` based on given parameters. + /// + /// Note: File sources with a native morsel implementation should return an + /// error from this method and implementing [`Self::create_morselizer`] instead. fn create_file_opener( &self, object_store: Arc, base_config: &FileScanConfig, partition: usize, ) -> Result>; + + /// Creates a `dyn Morselizer` based on given parameters. + /// + /// The default implementation preserves existing behavior by adapting the + /// legacy [`FileOpener`] API into a [`Morselizer`]. + /// + /// It is preferred to implement the [`Morselizer`] API directly by + /// implementing this method. + fn create_morselizer( + &self, + object_store: Arc, + base_config: &FileScanConfig, + partition: usize, + ) -> Result> { + let opener = self.create_file_opener(object_store, base_config, partition)?; + Ok(Box::new(FileOpenerMorselizer::new(opener))) + } /// Any fn as_any(&self) -> &dyn Any; diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 976e1158f5eb..2aa5b6a88860 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -587,11 +587,11 @@ impl DataSource for FileScanConfig { let source = self.file_source.with_batch_size(batch_size); - let opener = source.create_file_opener(object_store, self, partition)?; + let morselizer = source.create_morselizer(object_store, self, partition)?; let stream = FileStreamBuilder::new(self) .with_partition(partition) - .with_file_opener(opener) + .with_morselizer(morselizer) .with_metrics(source.metrics()) .build()?; Ok(Box::pin(cooperative(stream))) diff --git a/datafusion/datasource/src/file_stream/builder.rs b/datafusion/datasource/src/file_stream/builder.rs index 6d99f4b56a8e..efe9c39ce3b3 100644 --- a/datafusion/datasource/src/file_stream/builder.rs +++ b/datafusion/datasource/src/file_stream/builder.rs @@ -18,6 +18,8 @@ use std::sync::Arc; use crate::file_scan_config::FileScanConfig; +use crate::file_stream::scan_state::ScanState; +use crate::morsel::{FileOpenerMorselizer, Morselizer}; use datafusion_common::{Result, internal_err}; use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet}; @@ -28,7 +30,7 @@ use super::{FileOpener, FileStream, FileStreamState, OnError}; pub struct FileStreamBuilder<'a> { config: &'a FileScanConfig, partition: Option, - file_opener: Option>, + morselizer: Option>, metrics: Option<&'a ExecutionPlanMetricsSet>, on_error: OnError, } @@ -39,7 +41,7 @@ impl<'a> FileStreamBuilder<'a> { Self { config, partition: None, - file_opener: None, + morselizer: None, metrics: None, on_error: OnError::Fail, } @@ -52,8 +54,18 @@ impl<'a> FileStreamBuilder<'a> { } /// Configure the [`FileOpener`] used to open files. + /// + /// This will overwrite any setting from [`Self::with_morselizer`] pub fn with_file_opener(mut self, file_opener: Arc) -> Self { - self.file_opener = Some(file_opener); + self.morselizer = Some(Box::new(FileOpenerMorselizer::new(file_opener))); + self + } + + /// Configure the [`Morselizer`] used to open files. + /// + /// This will overwrite any setting from [`Self::with_file_opener`] + pub fn with_morselizer(mut self, morselizer: Box) -> Self { + self.morselizer = Some(morselizer); self } @@ -74,7 +86,7 @@ impl<'a> FileStreamBuilder<'a> { let Self { config, partition, - file_opener, + morselizer, metrics, on_error, } = self; @@ -82,8 +94,8 @@ impl<'a> FileStreamBuilder<'a> { let Some(partition) = partition else { return internal_err!("FileStreamBuilder missing required partition"); }; - let Some(file_opener) = file_opener else { - return internal_err!("FileStreamBuilder missing required file_opener"); + let Some(morselizer) = morselizer else { + return internal_err!("FileStreamBuilder missing required morselizer"); }; let Some(metrics) = metrics else { return internal_err!("FileStreamBuilder missing required metrics"); @@ -95,15 +107,19 @@ impl<'a> FileStreamBuilder<'a> { ); }; + let file_stream_metrics = FileStreamMetrics::new(metrics, partition); + let scan_state = Box::new(ScanState::new( + file_group.into_inner(), + config.limit, + morselizer, + on_error, + file_stream_metrics, + )); + Ok(FileStream { - file_iter: file_group.into_inner().into_iter().collect(), projected_schema, - remain: config.limit, - file_opener, - state: FileStreamState::Idle, - file_stream_metrics: FileStreamMetrics::new(metrics, partition), + state: FileStreamState::Scan { scan_state }, baseline_metrics: BaselineMetrics::new(metrics, partition), - on_error, }) } } diff --git a/datafusion/datasource/src/file_stream/metrics.rs b/datafusion/datasource/src/file_stream/metrics.rs index f4dddeaee8d0..5f3894404f40 100644 --- a/datafusion/datasource/src/file_stream/metrics.rs +++ b/datafusion/datasource/src/file_stream/metrics.rs @@ -77,7 +77,7 @@ pub struct FileStreamMetrics { /// Wall clock time elapsed for data decompression + decoding /// /// Time spent waiting for the FileStream's input. - pub time_processing: StartableTime, + pub time_processing: Time, /// Count of errors opening file. /// /// If using `OnError::Skip` this will provide a count of the number of files @@ -126,11 +126,8 @@ impl FileStreamMetrics { start: None, }; - let time_processing = StartableTime { - metrics: MetricBuilder::new(metrics) - .subset_time("time_elapsed_processing", partition), - start: None, - }; + let time_processing = + MetricBuilder::new(metrics).subset_time("time_elapsed_processing", partition); let file_open_errors = MetricBuilder::new(metrics) .with_category(MetricCategory::Rows) diff --git a/datafusion/datasource/src/file_stream/mod.rs b/datafusion/datasource/src/file_stream/mod.rs index 33e5065cb5a3..4be2d765da3c 100644 --- a/datafusion/datasource/src/file_stream/mod.rs +++ b/datafusion/datasource/src/file_stream/mod.rs @@ -23,8 +23,8 @@ mod builder; mod metrics; +mod scan_state; -use std::collections::VecDeque; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -38,33 +38,24 @@ use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet use arrow::record_batch::RecordBatch; +use futures::Stream; use futures::future::BoxFuture; use futures::stream::BoxStream; -use futures::{FutureExt as _, Stream, StreamExt as _, ready}; + +use self::scan_state::{ScanAndReturn, ScanState}; pub use builder::FileStreamBuilder; pub use metrics::{FileStreamMetrics, StartableTime}; /// A stream that iterates record batch by record batch, file over file. pub struct FileStream { - /// An iterator over input files. - file_iter: VecDeque, /// The stream schema (file schema including partition columns and after /// projection). projected_schema: SchemaRef, - /// The remaining number of records to parse, None if no limit - remain: Option, - /// A dynamic [`FileOpener`]. Calling `open()` returns a [`FileOpenFuture`], - /// which can be resolved to a stream of `RecordBatch`. - file_opener: Arc, /// The stream state state: FileStreamState, - /// File stream specific metrics - file_stream_metrics: FileStreamMetrics, /// runtime baseline metrics baseline_metrics: BaselineMetrics, - /// Describes the behavior of the `FileStream` if file opening or scanning fails - on_error: OnError, } impl FileStream { @@ -88,105 +79,34 @@ impl FileStream { /// If `OnError::Skip` the stream will skip files which encounter an error and continue /// If `OnError:Fail` (default) the stream will fail and stop processing when an error occurs pub fn with_on_error(mut self, on_error: OnError) -> Self { - self.on_error = on_error; + match &mut self.state { + FileStreamState::Scan { scan_state } => scan_state.set_on_error(on_error), + FileStreamState::Error | FileStreamState::Done => { + // no effect as there are no more files to process + } + }; self } - fn start_next_file(&mut self) -> Option> { - let part_file = self.file_iter.pop_front()?; - Some(self.file_opener.open(part_file)) - } - fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll>> { loop { match &mut self.state { - FileStreamState::Idle => match self.start_next_file().transpose() { - Ok(Some(future)) => { - self.file_stream_metrics.time_opening.start(); - self.state = FileStreamState::Open { future }; - } - Ok(None) => return Poll::Ready(None), - Err(e) => { - self.state = FileStreamState::Error; - return Poll::Ready(Some(Err(e))); - } - }, - FileStreamState::Open { future } => match ready!(future.poll_unpin(cx)) { - Ok(reader) => { - self.file_stream_metrics.files_opened.add(1); - self.file_stream_metrics.time_opening.stop(); - self.file_stream_metrics.time_scanning_until_data.start(); - self.file_stream_metrics.time_scanning_total.start(); - self.state = FileStreamState::Scan { reader }; - } - Err(e) => { - self.file_stream_metrics.file_open_errors.add(1); - match self.on_error { - OnError::Skip => { - self.file_stream_metrics.files_processed.add(1); - self.file_stream_metrics.time_opening.stop(); - self.state = FileStreamState::Idle - } - OnError::Fail => { - self.state = FileStreamState::Error; - return Poll::Ready(Some(Err(e))); - } - } - } - }, - FileStreamState::Scan { reader } => { - match ready!(reader.poll_next_unpin(cx)) { - Some(Ok(batch)) => { - self.file_stream_metrics.time_scanning_until_data.stop(); - self.file_stream_metrics.time_scanning_total.stop(); - let batch = match &mut self.remain { - Some(remain) => { - if *remain > batch.num_rows() { - *remain -= batch.num_rows(); - batch - } else { - let batch = batch.slice(0, *remain); - // Count this file and all remaining files - // we will never open. - let done = 1 + self.file_iter.len(); - self.file_stream_metrics - .files_processed - .add(done); - self.state = FileStreamState::Limit; - *remain = 0; - batch - } - } - None => batch, - }; - self.file_stream_metrics.time_scanning_total.start(); - return Poll::Ready(Some(Ok(batch))); - } - Some(Err(err)) => { - self.file_stream_metrics.file_scan_errors.add(1); - self.file_stream_metrics.time_scanning_until_data.stop(); - self.file_stream_metrics.time_scanning_total.stop(); - - match self.on_error { - OnError::Skip => { - self.file_stream_metrics.files_processed.add(1); - self.state = FileStreamState::Idle; - } - OnError::Fail => { - self.state = FileStreamState::Error; - return Poll::Ready(Some(Err(err))); - } - } + FileStreamState::Scan { scan_state: queue } => { + let action = queue.poll_scan(cx); + match action { + ScanAndReturn::Continue => continue, + ScanAndReturn::Done(result) => { + self.state = FileStreamState::Done; + return Poll::Ready(result); } - None => { - self.file_stream_metrics.files_processed.add(1); - self.file_stream_metrics.time_scanning_until_data.stop(); - self.file_stream_metrics.time_scanning_total.stop(); - self.state = FileStreamState::Idle; + ScanAndReturn::Error(err) => { + self.state = FileStreamState::Error; + return Poll::Ready(Some(Err(err))); } + ScanAndReturn::Return(result) => return result, } } - FileStreamState::Error | FileStreamState::Limit => { + FileStreamState::Error | FileStreamState::Done => { return Poll::Ready(None); } } @@ -201,9 +121,7 @@ impl Stream for FileStream { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - self.file_stream_metrics.time_processing.start(); let result = self.poll_inner(cx); - self.file_stream_metrics.time_processing.stop(); self.baseline_metrics.record_poll(result) } } @@ -238,33 +156,29 @@ pub trait FileOpener: Unpin + Send + Sync { fn open(&self, partitioned_file: PartitionedFile) -> Result; } -pub enum FileStreamState { - /// The idle state, no file is currently being read - Idle, - /// Currently performing asynchronous IO to obtain a stream of RecordBatch - /// for a given file - Open { - /// A [`FileOpenFuture`] returned by [`FileOpener::open`] - future: FileOpenFuture, - }, - /// Scanning the [`BoxStream`] returned by the completion of a [`FileOpenFuture`] - /// returned by [`FileOpener::open`] +enum FileStreamState { + /// Actively processing readers, ready morsels, and planner work. Scan { - /// The reader instance - reader: BoxStream<'static, Result>, + /// The ready queues and active reader for the current file. + scan_state: Box, }, /// Encountered an error Error, - /// Reached the row limit - Limit, + /// Finished scanning all requested data + Done, } -/// A timer that can be started and stopped. #[cfg(test)] mod tests { use crate::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; + use crate::morsel::mocks::{ + IoFutureId, MockMorselizer, MockPlanner, MorselId, PollsToResolve, + }; use crate::tests::make_partition; use crate::{PartitionedFile, TableSchema}; + use arrow::array::{AsArray, RecordBatch}; + use arrow::datatypes::{DataType, Field, Int32Type, Schema}; + use datafusion_common::DataFusionError; use datafusion_common::error::Result; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; @@ -274,8 +188,6 @@ mod tests { use crate::file_stream::{FileOpenFuture, FileOpener, FileStreamBuilder, OnError}; use crate::test_util::MockSource; - use arrow::array::RecordBatch; - use arrow::datatypes::Schema; use datafusion_common::{assert_batches_eq, exec_err, internal_err}; @@ -747,7 +659,7 @@ mod tests { assert!(err.contains("FileStreamBuilder missing required partition")); let err = builder_error(FileStreamBuilder::new(&config).with_partition(0)); - assert!(err.contains("FileStreamBuilder missing required file_opener")); + assert!(err.contains("FileStreamBuilder missing required morselizer")); let err = builder_error( FileStreamBuilder::new(&config) @@ -770,4 +682,334 @@ mod tests { ); assert!(err.contains("FileStreamBuilder invalid partition index: 1")); } + + /// Verifies the simplest morsel-driven flow: one planner produces one + /// morsel immediately, and that morsel is then scanned to completion. + #[tokio::test] + async fn morsel_no_io() -> Result<()> { + let test = FileStreamMorselTest::new().with_file( + MockPlanner::builder("file1.parquet") + .return_morsel(MorselId(10), 42) + .return_none() + .build(), + ); + + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Output Stream ----- + Batch: 42 + Done + ----- File Stream Events ----- + morselize_file: file1.parquet + planner_created: file1.parquet + planner_called: file1.parquet + morsel_produced: file1.parquet, MorselId(10) + morsel_stream_started: MorselId(10) + morsel_stream_batch_produced: MorselId(10), BatchId(42) + morsel_stream_finished: MorselId(10) + "); + + Ok(()) + } + + /// Verifies that a planner can block on one I/O phase and then produce a + /// morsel containing two batches. + #[tokio::test] + async fn morsel_single_io_two_batches() -> Result<()> { + let test = FileStreamMorselTest::new().with_file( + MockPlanner::builder("file1.parquet") + .return_io(IoFutureId(1), PollsToResolve(1)) + .return_morsel_batches(MorselId(10), vec![42, 43]) + .return_none() + .build(), + ); + + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Output Stream ----- + Batch: 42 + Batch: 43 + Done + ----- File Stream Events ----- + morselize_file: file1.parquet + planner_created: file1.parquet + planner_called: file1.parquet + io_future_created: file1.parquet, IoFutureId(1) + io_future_polled: file1.parquet, IoFutureId(1) + io_future_polled: file1.parquet, IoFutureId(1) + io_future_resolved: file1.parquet, IoFutureId(1) + planner_called: file1.parquet + morsel_produced: file1.parquet, MorselId(10) + morsel_stream_started: MorselId(10) + morsel_stream_batch_produced: MorselId(10), BatchId(42) + morsel_stream_batch_produced: MorselId(10), BatchId(43) + morsel_stream_finished: MorselId(10) + "); + + Ok(()) + } + + /// Verifies that a planner can traverse two sequential I/O phases before + /// producing one batch, similar to Parquet. + #[tokio::test] + async fn morsel_two_ios_one_batch() -> Result<()> { + let test = FileStreamMorselTest::new().with_file( + MockPlanner::builder("file1.parquet") + .return_io(IoFutureId(1), PollsToResolve(0)) + .return_io(IoFutureId(2), PollsToResolve(0)) + .return_morsel(MorselId(10), 42) + .return_none() + .build(), + ); + + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Output Stream ----- + Batch: 42 + Done + ----- File Stream Events ----- + morselize_file: file1.parquet + planner_created: file1.parquet + planner_called: file1.parquet + io_future_created: file1.parquet, IoFutureId(1) + io_future_polled: file1.parquet, IoFutureId(1) + io_future_resolved: file1.parquet, IoFutureId(1) + planner_called: file1.parquet + io_future_created: file1.parquet, IoFutureId(2) + io_future_polled: file1.parquet, IoFutureId(2) + io_future_resolved: file1.parquet, IoFutureId(2) + planner_called: file1.parquet + morsel_produced: file1.parquet, MorselId(10) + morsel_stream_started: MorselId(10) + morsel_stream_batch_produced: MorselId(10), BatchId(42) + morsel_stream_finished: MorselId(10) + "); + + Ok(()) + } + + /// Verifies that a planner I/O future can fail and terminate the stream. + #[tokio::test] + async fn morsel_io_error() -> Result<()> { + let test = FileStreamMorselTest::new().with_file( + MockPlanner::builder("file1.parquet") + .return_io_error( + IoFutureId(1), + PollsToResolve(0), + "io failed while opening file", + ) + .build(), + ); + + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Output Stream ----- + Error: io failed while opening file + Done + ----- File Stream Events ----- + morselize_file: file1.parquet + planner_created: file1.parquet + planner_called: file1.parquet + io_future_created: file1.parquet, IoFutureId(1) + io_future_polled: file1.parquet, IoFutureId(1) + io_future_errored: file1.parquet, IoFutureId(1), io failed while opening file + "); + + Ok(()) + } + + /// Verifies that planning can fail after a successful I/O phase. + #[tokio::test] + async fn morsel_plan_error_after_io() -> Result<()> { + let test = FileStreamMorselTest::new().with_file( + MockPlanner::builder("file1.parquet") + .return_io(IoFutureId(1), PollsToResolve(0)) + .return_error("planner failed after io") + .build(), + ); + + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Output Stream ----- + Error: planner failed after io + Done + ----- File Stream Events ----- + morselize_file: file1.parquet + planner_created: file1.parquet + planner_called: file1.parquet + io_future_created: file1.parquet, IoFutureId(1) + io_future_polled: file1.parquet, IoFutureId(1) + io_future_resolved: file1.parquet, IoFutureId(1) + planner_called: file1.parquet + "); + + Ok(()) + } + + /// Verifies that `FileStream` scans multiple files in order. + #[tokio::test] + async fn morsel_multiple_files() -> Result<()> { + let test = FileStreamMorselTest::new() + .with_file( + MockPlanner::builder("file1.parquet") + .return_morsel(MorselId(10), 41) + .return_none() + .build(), + ) + .with_file( + MockPlanner::builder("file2.parquet") + .return_morsel(MorselId(11), 42) + .return_none() + .build(), + ); + + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Output Stream ----- + Batch: 41 + Batch: 42 + Done + ----- File Stream Events ----- + morselize_file: file1.parquet + planner_created: file1.parquet + planner_called: file1.parquet + morsel_produced: file1.parquet, MorselId(10) + morsel_stream_started: MorselId(10) + morsel_stream_batch_produced: MorselId(10), BatchId(41) + morsel_stream_finished: MorselId(10) + morselize_file: file2.parquet + planner_created: file2.parquet + planner_called: file2.parquet + morsel_produced: file2.parquet, MorselId(11) + morsel_stream_started: MorselId(11) + morsel_stream_batch_produced: MorselId(11), BatchId(42) + morsel_stream_finished: MorselId(11) + "); + + Ok(()) + } + + /// Verifies that a global limit can stop the stream before a second file is opened. + #[tokio::test] + async fn morsel_limit_prevents_second_file() -> Result<()> { + let test = FileStreamMorselTest::new() + .with_file( + MockPlanner::builder("file1.parquet") + .return_morsel_batches(MorselId(10), vec![41, 42]) + .return_none() + .build(), + ) + .with_file( + MockPlanner::builder("file2.parquet") + .return_morsel(MorselId(11), 43) + .return_none() + .build(), + ) + .with_limit(1); + + // Note the snapshot should not ever see planner id2 + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Output Stream ----- + Batch: 41 + Done + ----- File Stream Events ----- + morselize_file: file1.parquet + planner_created: file1.parquet + planner_called: file1.parquet + morsel_produced: file1.parquet, MorselId(10) + morsel_stream_started: MorselId(10) + morsel_stream_batch_produced: MorselId(10), BatchId(41) + "); + + Ok(()) + } + + /// Tests how FileStream opens and processes files. + #[derive(Clone)] + struct FileStreamMorselTest { + morselizer: MockMorselizer, + file_names: Vec, + limit: Option, + } + + impl FileStreamMorselTest { + /// Creates an empty test harness. + fn new() -> Self { + Self { + morselizer: MockMorselizer::new(), + file_names: vec![], + limit: None, + } + } + + /// Adds one file and its root planner to the test input. + fn with_file(mut self, planner: MockPlanner) -> Self { + self.file_names.push(planner.file_path().to_string()); + self.morselizer = self.morselizer.with_file(planner); + self + } + + /// Sets a global output limit for the stream. + fn with_limit(mut self, limit: usize) -> Self { + self.limit = Some(limit); + self + } + + /// Runs the test returns combined output and scheduler trace text as a String. + async fn run(self) -> Result { + let observer = self.morselizer.observer().clone(); + observer.clear(); + + let config = self.test_config(); + let metrics_set = ExecutionPlanMetricsSet::new(); + let mut stream = FileStreamBuilder::new(&config) + .with_partition(0) + .with_morselizer(Box::new(self.morselizer)) + .with_metrics(&metrics_set) + .build()?; + + let mut stream_contents = Vec::new(); + while let Some(result) = stream.next().await { + match result { + Ok(batch) => { + let col = batch.column(0).as_primitive::(); + let batch_id = col.value(0); + stream_contents.push(format!("Batch: {batch_id}")); + } + Err(e) => { + // Pull the actual message for external errors rather than + // relying on DataFusionError formatting, which changes + // if backtraces are enabled, etc + let message = if let DataFusionError::External(generic) = e { + generic.to_string() + } else { + e.to_string() + }; + stream_contents.push(format!("Error: {message}")); + } + } + } + stream_contents.push("Done".to_string()); + + Ok(format!( + "----- Output Stream -----\n{}\n----- File Stream Events -----\n{}", + stream_contents.join("\n"), + observer.format_events() + )) + } + + /// Builds the `FileScanConfig` for the configured mock file set. + fn test_config(&self) -> FileScanConfig { + let file_group = self + .file_names + .iter() + .map(|name| PartitionedFile::new(name, 10)) + .collect(); + let table_schema = TableSchema::new( + Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])), + vec![], + ); + FileScanConfigBuilder::new( + ObjectStoreUrl::parse("test:///").unwrap(), + Arc::new(MockSource::new(table_schema)), + ) + .with_file_group(file_group) + .with_limit(self.limit) + .build() + } + } } diff --git a/datafusion/datasource/src/file_stream/scan_state.rs b/datafusion/datasource/src/file_stream/scan_state.rs new file mode 100644 index 000000000000..2d6f4756ee27 --- /dev/null +++ b/datafusion/datasource/src/file_stream/scan_state.rs @@ -0,0 +1,283 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::VecDeque; +use std::task::{Context, Poll}; + +use crate::PartitionedFile; +use crate::morsel::{Morsel, MorselPlanner, Morselizer, PendingMorselPlanner}; +use arrow::record_batch::RecordBatch; +use datafusion_common::{DataFusionError, Result}; +use datafusion_physical_plan::metrics::ScopedTimerGuard; +use futures::stream::BoxStream; +use futures::{FutureExt as _, StreamExt as _}; + +use super::{FileStreamMetrics, OnError}; + +/// State [`FileStreamState::Scan`]. +/// +/// There is one `ScanState` per `FileStream`, and thus per output partition. +/// +/// It groups together the lifecycle of scanning that partition's files: +/// unopened files, CPU-ready planners, pending planner I/O, ready morsels, +/// the active reader, and the metrics associated with processing that work. +/// +/// # State Transitions +/// +/// ```text +/// file_iter +/// | +/// v +/// morselizer.plan_file(file) +/// | +/// v +/// ready_planners ---> plan() ---> ready_morsels ---> into_stream() ---> reader ---> RecordBatches +/// ^ | +/// | v +/// | pending_planner +/// | | +/// | v +/// +-------- poll until ready +/// ``` +/// +/// [`FileStreamState::Scan`]: super::FileStreamState::Scan +pub(super) struct ScanState { + /// Files that still need to be planned. + file_iter: VecDeque, + /// Remaining row limit, if any. + remain: Option, + /// The morselizer used to plan files. + morselizer: Box, + /// Behavior if opening or scanning a file fails. + on_error: OnError, + /// CPU-ready planners for the current file. + ready_planners: VecDeque>, + /// Ready morsels for the current file. + ready_morsels: VecDeque>, + /// The active reader, if any. + reader: Option>>, + /// The single planner currently blocked on I/O, if any. + pending_planner: Option, + /// Metrics for the active scan queues. + metrics: FileStreamMetrics, +} + +impl ScanState { + pub(super) fn new( + file_iter: impl Into>, + remain: Option, + morselizer: Box, + on_error: OnError, + metrics: FileStreamMetrics, + ) -> Self { + let file_iter = file_iter.into(); + Self { + file_iter, + remain, + morselizer, + on_error, + ready_planners: Default::default(), + ready_morsels: Default::default(), + reader: None, + pending_planner: None, + metrics, + } + } + + /// Updates how scan errors are handled while the stream is still active. + pub(super) fn set_on_error(&mut self, on_error: OnError) { + self.on_error = on_error; + } + + /// Drives one iteration of the active scan state. + /// + /// Work is attempted in this order: + /// 1. resolve any pending planner I/O + /// 2. poll the active reader + /// 3. turn a ready morsel into the active reader + /// 4. run CPU planning on a ready planner + /// 5. morselize the next unopened file + /// + /// The return [`ScanAndReturn`] tells `poll_inner` how to update the + /// outer `FileStreamState`. + pub(super) fn poll_scan(&mut self, cx: &mut Context<'_>) -> ScanAndReturn { + let _processing_timer: ScopedTimerGuard<'_> = + self.metrics.time_processing.timer(); + + // Try and resolve outstanding IO first + if let Some(mut pending_planner) = self.pending_planner.take() { + match pending_planner.poll_unpin(cx) { + // IO is still pending + Poll::Pending => { + self.pending_planner = Some(pending_planner); + return ScanAndReturn::Return(Poll::Pending); + } + // IO resolved, and the planner is ready for CPU work + Poll::Ready(Ok(planner)) => { + self.ready_planners.push_back(planner); + return ScanAndReturn::Continue; + } + // IO Error + Poll::Ready(Err(err)) => { + self.metrics.file_open_errors.add(1); + self.metrics.time_opening.stop(); + return match self.on_error { + OnError::Skip => { + self.metrics.files_processed.add(1); + ScanAndReturn::Continue + } + OnError::Fail => ScanAndReturn::Error(err), + }; + } + } + } + + // Next try and get the next batch from the active reader, if any. + if let Some(reader) = self.reader.as_mut() { + match reader.poll_next_unpin(cx) { + // Morsels should ideally only expose ready-to-decode streams, + // but tolerate pending readers here. + Poll::Pending => return ScanAndReturn::Return(Poll::Pending), + Poll::Ready(Some(Ok(batch))) => { + self.metrics.time_scanning_until_data.stop(); + self.metrics.time_scanning_total.stop(); + // Apply any remaining row limit. + let (batch, finished) = match &mut self.remain { + Some(remain) => { + if *remain > batch.num_rows() { + *remain -= batch.num_rows(); + self.metrics.time_scanning_total.start(); + (batch, false) + } else { + let batch = batch.slice(0, *remain); + let done = 1 + self.file_iter.len(); + self.metrics.files_processed.add(done); + *remain = 0; + (batch, true) + } + } + None => { + self.metrics.time_scanning_total.start(); + (batch, false) + } + }; + return if finished { + ScanAndReturn::Done(Some(Ok(batch))) + } else { + ScanAndReturn::Return(Poll::Ready(Some(Ok(batch)))) + }; + } + Poll::Ready(Some(Err(err))) => { + self.reader = None; + self.metrics.file_scan_errors.add(1); + self.metrics.time_scanning_until_data.stop(); + self.metrics.time_scanning_total.stop(); + return match self.on_error { + OnError::Skip => { + self.metrics.files_processed.add(1); + ScanAndReturn::Continue + } + OnError::Fail => ScanAndReturn::Error(err), + }; + } + Poll::Ready(None) => { + self.reader = None; + self.metrics.files_processed.add(1); + self.metrics.time_scanning_until_data.stop(); + self.metrics.time_scanning_total.stop(); + return ScanAndReturn::Continue; + } + } + } + + // No active reader, but a morsel is ready to become the reader. + if let Some(morsel) = self.ready_morsels.pop_front() { + self.metrics.time_opening.stop(); + self.metrics.time_scanning_until_data.start(); + self.metrics.time_scanning_total.start(); + self.reader = Some(morsel.into_stream()); + return ScanAndReturn::Continue; + } + + // No reader or morsel, so try to produce more work via CPU planning. + if let Some(planner) = self.ready_planners.pop_front() { + return match planner.plan() { + Ok(Some(mut plan)) => { + // Queue any newly-ready morsels, planners, or planner I/O. + self.ready_morsels.extend(plan.take_morsels()); + self.ready_planners.extend(plan.take_ready_planners()); + if let Some(pending_planner) = plan.take_pending_planner() { + self.pending_planner = Some(pending_planner); + } + ScanAndReturn::Continue + } + Ok(None) => { + self.metrics.files_processed.add(1); + self.metrics.time_opening.stop(); + ScanAndReturn::Continue + } + Err(err) => { + self.metrics.file_open_errors.add(1); + self.metrics.time_opening.stop(); + match self.on_error { + OnError::Skip => { + self.metrics.files_processed.add(1); + ScanAndReturn::Continue + } + OnError::Fail => ScanAndReturn::Error(err), + } + } + }; + } + + // No outstanding work remains, so morselize the next unopened file. + let part_file = match self.file_iter.pop_front() { + Some(part_file) => part_file, + None => return ScanAndReturn::Done(None), + }; + + self.metrics.time_opening.start(); + match self.morselizer.plan_file(part_file) { + Ok(planner) => { + self.metrics.files_opened.add(1); + self.ready_planners.push_back(planner); + ScanAndReturn::Continue + } + Err(err) => match self.on_error { + OnError::Skip => { + self.metrics.file_open_errors.add(1); + self.metrics.time_opening.stop(); + self.metrics.files_processed.add(1); + ScanAndReturn::Continue + } + OnError::Fail => ScanAndReturn::Error(err), + }, + } + } +} + +/// What should be done on the next iteration of [`ScanState::poll_scan`]? +pub(super) enum ScanAndReturn { + /// Poll again. + Continue, + /// Return the provided result without changing the outer state. + Return(Poll>>), + /// Update the outer `FileStreamState` to `Done` and return the provided result. + Done(Option>), + /// Update the outer `FileStreamState` to `Error` and return the provided error. + Error(DataFusionError), +} diff --git a/datafusion/datasource/src/morsel/adapters.rs b/datafusion/datasource/src/morsel/adapters.rs new file mode 100644 index 000000000000..6fa6d4916771 --- /dev/null +++ b/datafusion/datasource/src/morsel/adapters.rs @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::PartitionedFile; +use crate::file_stream::FileOpener; +use crate::morsel::{Morsel, MorselPlan, MorselPlanner, Morselizer}; +use arrow::array::RecordBatch; +use datafusion_common::Result; +use futures::FutureExt; +use futures::stream::BoxStream; +use std::fmt::Debug; +use std::sync::Arc; + +/// Adapt a legacy [`FileOpener`] to the morsel API. +/// +/// This preserves backwards compatibility for file formats that have not yet +/// implemented a native [`Morselizer`]. +pub struct FileOpenerMorselizer { + file_opener: Arc, +} + +impl Debug for FileOpenerMorselizer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FileOpenerMorselizer") + .field("file_opener", &"...") + .finish() + } +} + +impl FileOpenerMorselizer { + pub fn new(file_opener: Arc) -> Self { + Self { file_opener } + } +} + +impl Morselizer for FileOpenerMorselizer { + fn plan_file(&self, file: PartitionedFile) -> Result> { + Ok(Box::new(FileOpenFutureMorselPlanner::new( + Arc::clone(&self.file_opener), + file, + ))) + } +} + +enum FileOpenFutureMorselPlanner { + Unopened { + file_opener: Arc, + file: Box, + }, + ReadyStream(BoxStream<'static, Result>), +} + +impl Debug for FileOpenFutureMorselPlanner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Unopened { .. } => f + .debug_tuple("FileOpenFutureMorselPlanner::Unopened") + .finish(), + Self::ReadyStream(_) => f + .debug_tuple("FileOpenFutureMorselPlanner::ReadyStream") + .finish(), + } + } +} + +impl FileOpenFutureMorselPlanner { + fn new(file_opener: Arc, file: PartitionedFile) -> Self { + Self::Unopened { + file_opener, + file: Box::new(file), + } + } +} + +impl MorselPlanner for FileOpenFutureMorselPlanner { + fn plan(self: Box) -> Result> { + match *self { + Self::Unopened { file_opener, file } => { + let io_future = async move { + let stream = file_opener.open(*file)?.await?; + Ok(Box::new(Self::ReadyStream(stream)) as Box) + } + .boxed(); + Ok(Some(MorselPlan::new().with_pending_planner(io_future))) + } + Self::ReadyStream(stream) => Ok(Some( + MorselPlan::new() + .with_morsels(vec![Box::new(FileStreamMorsel { stream })]), + )), + } + } +} + +struct FileStreamMorsel { + stream: BoxStream<'static, Result>, +} + +impl Debug for FileStreamMorsel { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FileStreamMorsel").finish_non_exhaustive() + } +} + +impl Morsel for FileStreamMorsel { + fn into_stream(self: Box) -> BoxStream<'static, Result> { + self.stream + } +} diff --git a/datafusion/datasource/src/morsel/mocks.rs b/datafusion/datasource/src/morsel/mocks.rs new file mode 100644 index 000000000000..e23171ae2981 --- /dev/null +++ b/datafusion/datasource/src/morsel/mocks.rs @@ -0,0 +1,612 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Test-only mocks for exercising the morsel-driven `FileStream` scheduler. + +use std::collections::{HashMap, VecDeque}; +use std::fmt::{Display, Formatter}; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll}; + +use crate::PartitionedFile; +use crate::morsel::{Morsel, MorselPlan, MorselPlanner, Morselizer}; +use arrow::array::{Int32Array, RecordBatch}; +use arrow::datatypes::{DataType, Field, Schema}; +use datafusion_common::{DataFusionError, Result, internal_datafusion_err}; +use futures::stream::BoxStream; +use futures::{Future, FutureExt}; + +// Use thin wrappers around usize so the test setups are more explicit + +/// Identifier for a mock morsel in scheduler snapshots. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub(crate) struct MorselId(pub usize); + +/// Identifier for a produced batch in scheduler snapshots. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub(crate) struct BatchId(pub usize); + +/// Identifier for a mock I/O future in scheduler snapshots. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub(crate) struct IoFutureId(pub usize); + +/// Number of pending polls before a mock I/O future resolves. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub(crate) struct PollsToResolve(pub usize); + +/// Error message returned by a mock planner or I/O future. +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct MockError(pub String); + +impl Display for MockError { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +impl std::error::Error for MockError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + None + } +} + +/// Scheduler-visible event captured by the mock morsel test harness. +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) enum MorselEvent { + MorselizeFile { + path: String, + }, + PlannerCreated { + planner_name: String, + }, + PlannerCalled { + planner_name: String, + }, + IoFutureCreated { + planner_name: String, + io_future_id: IoFutureId, + }, + IoFuturePolled { + planner_name: String, + io_future_id: IoFutureId, + }, + IoFutureResolved { + planner_name: String, + io_future_id: IoFutureId, + }, + IoFutureErrored { + planner_name: String, + io_future_id: IoFutureId, + message: String, + }, + MorselProduced { + planner_name: String, + morsel_id: MorselId, + }, + MorselStreamStarted { + morsel_id: MorselId, + }, + MorselStreamBatchProduced { + morsel_id: MorselId, + batch_id: BatchId, + }, + MorselStreamFinished { + morsel_id: MorselId, + }, +} + +impl Display for MorselEvent { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + MorselEvent::MorselizeFile { path } => { + write!(f, "morselize_file: {path}") + } + MorselEvent::PlannerCreated { planner_name } => { + write!(f, "planner_created: {planner_name}") + } + MorselEvent::PlannerCalled { planner_name } => { + write!(f, "planner_called: {planner_name}") + } + MorselEvent::IoFutureCreated { + planner_name, + io_future_id, + } => write!(f, "io_future_created: {planner_name}, {io_future_id:?}"), + MorselEvent::IoFuturePolled { + planner_name, + io_future_id, + } => write!(f, "io_future_polled: {planner_name}, {io_future_id:?}"), + MorselEvent::IoFutureResolved { + planner_name, + io_future_id, + } => write!(f, "io_future_resolved: {planner_name}, {io_future_id:?}"), + MorselEvent::IoFutureErrored { + planner_name, + io_future_id, + message, + } => write!( + f, + "io_future_errored: {planner_name}, {io_future_id:?}, {message}" + ), + MorselEvent::MorselProduced { + planner_name, + morsel_id, + } => write!(f, "morsel_produced: {planner_name}, {morsel_id:?}"), + MorselEvent::MorselStreamStarted { morsel_id } => { + write!(f, "morsel_stream_started: {morsel_id:?}") + } + MorselEvent::MorselStreamBatchProduced { + morsel_id, + batch_id, + } => write!( + f, + "morsel_stream_batch_produced: {morsel_id:?}, {batch_id:?}" + ), + MorselEvent::MorselStreamFinished { morsel_id } => { + write!(f, "morsel_stream_finished: {morsel_id:?}") + } + } + } +} + +/// Shared observer that records scheduler events for snapshot tests. +#[derive(Debug, Default, Clone)] +pub(crate) struct MorselObserver { + events: Arc>>, +} + +impl MorselObserver { + /// Clears any previously recorded events. + pub(crate) fn clear(&self) { + self.events.lock().unwrap().clear(); + } + + /// Records one new scheduler event. + pub(crate) fn push(&self, event: MorselEvent) { + self.events.lock().unwrap().push(event); + } + + /// Formats all recorded events into a stable, snapshot-friendly trace. + pub(crate) fn format_events(&self) -> String { + self.events + .lock() + .unwrap() + .iter() + .map(ToString::to_string) + .collect::>() + .join("\n") + } +} + +/// Declarative planner spec used by the mock morselizer. +#[derive(Debug, Clone)] +pub(crate) struct MockPlanner { + file_path: String, + steps: VecDeque, +} + +impl MockPlanner { + /// Creates a fluent builder for one mock planner. + pub(crate) fn builder(file_path: impl Into) -> MockPlannerBuilder { + MockPlannerBuilder { + file_path: file_path.into(), + ..Default::default() + } + } + + /// Returns the file path associated with this planner. + pub(crate) fn file_path(&self) -> &str { + &self.file_path + } +} + +/// One scheduler-visible step in a mock planner's lifecycle. +#[derive(Debug, Clone)] +enum PlannerStep { + Morsel { + morsel_id: MorselId, + batch_ids: Vec, + }, + Io { + io_future_id: IoFutureId, + polls_to_resolve: PollsToResolve, + result: std::result::Result<(), MockError>, + }, + Error { + error: MockError, + }, + None, +} + +/// Fluent builder for [`MockPlanner`] test specs. +#[derive(Debug, Default)] +pub(crate) struct MockPlannerBuilder { + file_path: String, + steps: Vec, +} + +impl MockPlannerBuilder { + /// Adds one planning step that returns a single ready morsel. + pub(crate) fn return_morsel(mut self, morsel_id: MorselId, batch_id: i32) -> Self { + self.steps.push(PlannerStep::Morsel { + morsel_id, + batch_ids: vec![batch_id], + }); + self + } + + /// Adds one planning step that returns a morsel with multiple ready batches. + pub(crate) fn return_morsel_batches( + mut self, + morsel_id: MorselId, + batch_ids: Vec, + ) -> Self { + self.steps.push(PlannerStep::Morsel { + morsel_id, + batch_ids, + }); + self + } + + /// Adds one planning step that returns a single outstanding I/O future. + pub(crate) fn return_io( + mut self, + io_future_id: IoFutureId, + polls_to_resolve: PollsToResolve, + ) -> Self { + self.steps.push(PlannerStep::Io { + io_future_id, + polls_to_resolve, + result: Ok(()), + }); + self + } + + /// Adds one planning step that returns a failing I/O future. + pub(crate) fn return_io_error( + mut self, + io_future_id: IoFutureId, + polls_to_resolve: PollsToResolve, + message: impl Into, + ) -> Self { + self.steps.push(PlannerStep::Io { + io_future_id, + polls_to_resolve, + result: Err(MockError(message.into())), + }); + self + } + + /// Adds one planning step that reports the planner is exhausted. + pub(crate) fn return_none(mut self) -> Self { + self.steps.push(PlannerStep::None); + self + } + + /// Adds one planning step that fails during CPU planning. + pub(crate) fn return_error(mut self, message: impl Into) -> Self { + self.steps.push(PlannerStep::Error { + error: MockError(message.into()), + }); + self + } + + /// Finalizes the configured mock planner. + pub(crate) fn build(self) -> MockPlanner { + let Self { file_path, steps } = self; + + MockPlanner { + file_path, + steps: VecDeque::from(steps), + } + } +} + +/// Mock [`Morselizer`] that maps file paths to fixed planner specs. +#[derive(Debug, Clone, Default)] +pub(crate) struct MockMorselizer { + observer: MorselObserver, + files: HashMap, +} + +impl MockMorselizer { + /// Creates an empty mock morselizer. + pub(crate) fn new() -> Self { + Self::default() + } + + /// Returns the shared event observer for this test harness. + pub(crate) fn observer(&self) -> &MorselObserver { + &self.observer + } + + /// Associates a file path with the planner spec used to open it. + pub(crate) fn with_file(mut self, planner: MockPlanner) -> Self { + self.files.insert(planner.file_path.clone(), planner); + self + } +} + +impl Morselizer for MockMorselizer { + fn plan_file(&self, file: PartitionedFile) -> Result> { + let path = file.object_meta.location.to_string(); + self.observer + .push(MorselEvent::MorselizeFile { path: path.clone() }); + + let planner = self.files.get(&path).cloned().ok_or_else(|| { + internal_datafusion_err!("No mock planner configured for file: {path}") + })?; + + self.observer.push(MorselEvent::PlannerCreated { + planner_name: planner.file_path.clone(), + }); + + Ok(Box::new(MockMorselPlanner::new( + self.observer.clone(), + planner, + ))) + } +} + +/// Concrete mock planner that executes one predefined step per `plan()` call. +#[derive(Debug)] +struct MockMorselPlanner { + observer: MorselObserver, + planner_name: String, + steps: VecDeque, +} + +impl MockMorselPlanner { + /// Creates a concrete planner from its declarative test spec. + fn new(observer: MorselObserver, planner: MockPlanner) -> Self { + Self { + observer, + planner_name: planner.file_path, + steps: planner.steps, + } + } +} + +/// Rebuilds the mock planner continuation after one step completes. +fn remaining_planners( + observer: MorselObserver, + planner_name: String, + steps: VecDeque, +) -> Vec> { + let only_none_remaining = + matches!(steps.front(), Some(PlannerStep::None)) && steps.len() == 1; + + if steps.is_empty() || only_none_remaining { + Vec::new() + } else { + vec![Box::new(MockMorselPlanner { + observer, + planner_name, + steps, + }) as Box] + } +} + +impl MorselPlanner for MockMorselPlanner { + fn plan(self: Box) -> Result> { + let Self { + observer, + planner_name, + mut steps, + } = *self; + + observer.push(MorselEvent::PlannerCalled { + planner_name: planner_name.clone(), + }); + + let Some(step) = steps.pop_front() else { + return Ok(None); + }; + + match step { + PlannerStep::Morsel { + morsel_id, + batch_ids, + } => { + observer.push(MorselEvent::MorselProduced { + planner_name: planner_name.clone(), + morsel_id, + }); + Ok(Some( + MorselPlan::new() + .with_morsels(vec![Box::new(MockMorsel::new( + observer.clone(), + morsel_id, + batch_ids, + ))]) + .with_planners(remaining_planners( + observer.clone(), + planner_name.clone(), + steps, + )), + )) + } + PlannerStep::Io { + io_future_id, + polls_to_resolve, + result, + } => { + observer.push(MorselEvent::IoFutureCreated { + planner_name: planner_name.clone(), + io_future_id, + }); + let io_future = MockIoFuture::new( + observer.clone(), + planner_name.clone(), + io_future_id, + polls_to_resolve, + result, + ) + .map(move |result| { + result?; + Ok(Box::new(MockMorselPlanner { + observer, + planner_name, + steps, + }) as Box) + }) + .boxed(); + Ok(Some(MorselPlan::new().with_pending_planner(io_future))) + } + PlannerStep::Error { error } => { + Err(DataFusionError::External(Box::new(error))) + } + PlannerStep::None => Ok(None), + } + } +} + +/// Concrete morsel used by the mock scheduler tests. +#[derive(Debug)] +pub(crate) struct MockMorsel { + observer: MorselObserver, + morsel_id: MorselId, + batch_ids: Vec, +} + +impl MockMorsel { + /// Creates a mock morsel with a deterministic sequence of batches. + fn new(observer: MorselObserver, morsel_id: MorselId, batch_ids: Vec) -> Self { + Self { + observer, + morsel_id, + batch_ids, + } + } +} + +impl Morsel for MockMorsel { + fn into_stream(self: Box) -> BoxStream<'static, Result> { + self.observer.push(MorselEvent::MorselStreamStarted { + morsel_id: self.morsel_id, + }); + Box::pin(MockMorselStream { + observer: self.observer.clone(), + morsel_id: self.morsel_id, + batch_ids: self.batch_ids.into(), + finished: false, + }) + } +} + +/// Stream returned by [`MockMorsel::into_stream`]. +struct MockMorselStream { + observer: MorselObserver, + morsel_id: MorselId, + batch_ids: VecDeque, + finished: bool, +} + +impl futures::Stream for MockMorselStream { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + if let Some(batch_id) = self.batch_ids.pop_front() { + self.observer.push(MorselEvent::MorselStreamBatchProduced { + morsel_id: self.morsel_id, + batch_id: BatchId(batch_id as usize), + }); + return Poll::Ready(Some(Ok(single_value_batch(batch_id)))); + } + + if !self.finished { + self.finished = true; + self.observer.push(MorselEvent::MorselStreamFinished { + morsel_id: self.morsel_id, + }); + } + + Poll::Ready(None) + } +} + +/// Deterministic future used to simulate planner I/O in tests. +struct MockIoFuture { + observer: MorselObserver, + planner_name: String, + io_future_id: IoFutureId, + pending_polls_remaining: usize, + result: std::result::Result<(), MockError>, +} + +impl MockIoFuture { + /// Creates a future that resolves after `io_polls` pending polls. + fn new( + observer: MorselObserver, + planner_name: String, + io_future_id: IoFutureId, + polls_to_resolve: PollsToResolve, + result: std::result::Result<(), MockError>, + ) -> Self { + Self { + observer, + planner_name, + io_future_id, + pending_polls_remaining: polls_to_resolve.0, + result, + } + } +} + +impl Future for MockIoFuture { + type Output = Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.observer.push(MorselEvent::IoFuturePolled { + planner_name: self.planner_name.clone(), + io_future_id: self.io_future_id, + }); + + if self.pending_polls_remaining > 0 { + self.pending_polls_remaining -= 1; + cx.waker().wake_by_ref(); + return Poll::Pending; + } + + match &self.result { + Ok(()) => { + self.observer.push(MorselEvent::IoFutureResolved { + planner_name: self.planner_name.clone(), + io_future_id: self.io_future_id, + }); + Poll::Ready(Ok(())) + } + Err(e) => { + self.observer.push(MorselEvent::IoFutureErrored { + planner_name: self.planner_name.clone(), + io_future_id: self.io_future_id, + message: e.0.clone(), + }); + Poll::Ready(Err(DataFusionError::External(Box::new(e.clone())))) + } + } + } +} + +/// Creates a one-row batch so snapshot output stays compact and readable. +fn single_value_batch(value: i32) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])); + RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![value]))]).unwrap() +} diff --git a/datafusion/datasource/src/morsel/mod.rs b/datafusion/datasource/src/morsel/mod.rs index 5f200d779469..7b5066ca07a2 100644 --- a/datafusion/datasource/src/morsel/mod.rs +++ b/datafusion/datasource/src/morsel/mod.rs @@ -26,7 +26,12 @@ //! It is inspired by the paper [Morsel-Driven Parallelism: A NUMA-Aware Query //! Evaluation Framework for the Many-Core Age](https://db.in.tum.de/~leis/papers/morsels.pdf). +mod adapters; +#[cfg(test)] +pub(crate) mod mocks; + use crate::PartitionedFile; +pub(crate) use adapters::FileOpenerMorselizer; use arrow::array::RecordBatch; use datafusion_common::Result; use futures::FutureExt;