From 9d4457ca3ab44228a2631d7cf89bdd36dd2cb7ac Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 31 Mar 2026 11:31:49 -0400 Subject: [PATCH 1/9] Introduce Morselizer API --- datafusion/datasource-parquet/src/opener.rs | 406 +++++++++++++++----- datafusion/datasource-parquet/src/source.rs | 56 +-- datafusion/datasource/src/mod.rs | 1 + datafusion/datasource/src/morsel/mod.rs | 174 +++++++++ 4 files changed, 507 insertions(+), 130 deletions(-) create mode 100644 datafusion/datasource/src/morsel/mod.rs diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 6621706c35c81..dcecce51286c3 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! [`ParquetOpener`] state machine for opening Parquet files +//! [`ParquetOpener`] and [`ParquetMorselizer`] state machines for opening Parquet files use crate::page_filter::PagePruningAccessPlanFilter; use crate::row_filter::build_projection_read_plan; @@ -26,15 +26,21 @@ use crate::{ }; use arrow::array::{RecordBatch, RecordBatchOptions}; use arrow::datatypes::DataType; +use datafusion_common::internal_datafusion_err; +use datafusion_common::internal_err; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; +use datafusion_datasource::morsel::{Morsel, MorselPlan, MorselPlanner, Morselizer}; use datafusion_physical_expr::projection::{ProjectionExprs, Projector}; use datafusion_physical_expr::utils::reassign_expr_columns; use datafusion_physical_expr_adapter::replace_columns_with_literals; -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; +use std::fmt; use std::future::Future; use std::mem; use std::pin::Pin; use std::sync::Arc; +use std::sync::mpsc; +use std::sync::mpsc::{Receiver, TryRecvError}; use std::task::{Context, Poll}; use arrow::datatypes::{Schema, SchemaRef, TimeUnit}; @@ -77,12 +83,26 @@ use parquet::bloom_filter::Sbbf; use parquet::errors::ParquetError; use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader}; -/// Entry point for opening a Parquet file +/// Implements [`FileOpener`] for Parquet +#[derive(Clone)] +pub(super) struct ParquetOpener { + pub(super) morselizer: ParquetMorselizer, +} + +impl FileOpener for ParquetOpener { + fn open(&self, partitioned_file: PartitionedFile) -> Result { + let future = ParquetOpenFuture::new(&self.morselizer, partitioned_file)?; + Ok(Box::pin(future)) + } +} + +/// Stateless Parquet morselizer implementation. /// /// Reading a Parquet file is a multi-stage process, with multiple CPU-intensive /// steps interspersed with I/O steps. The code in this module implements the steps /// as an explicit state machine -- see [`ParquetOpenState`] for details. -pub(super) struct ParquetOpener { +#[derive(Clone)] +pub(super) struct ParquetMorselizer { /// Execution partition index pub(crate) partition_index: usize, /// Projection to apply on top of the table schema (i.e. can reference partition columns). @@ -137,6 +157,23 @@ pub(super) struct ParquetOpener { pub reverse_row_groups: bool, } +impl fmt::Debug for ParquetMorselizer { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ParquetMorselizer") + .field("partition_index", &self.partition_index) + .field("preserve_order", &self.preserve_order) + .field("enable_page_index", &self.enable_page_index) + .field("enable_bloom_filter", &self.enable_bloom_filter) + .finish() + } +} + +impl Morselizer for ParquetMorselizer { + fn plan_file(&self, file: PartitionedFile) -> Result> { + Ok(Box::new(ParquetMorselPlanner::try_new(self, file)?)) + } +} + /// States for [`ParquetOpenFuture`] /// /// These states correspond to the steps required to read and apply various @@ -216,6 +253,27 @@ enum ParquetOpenState { Done, } +impl fmt::Debug for ParquetOpenState { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let state = match self { + ParquetOpenState::Start { .. } => "Start", + #[cfg(feature = "parquet_encryption")] + ParquetOpenState::LoadEncryption(_) => "LoadEncryption", + ParquetOpenState::PruneFile(_) => "PruneFile", + ParquetOpenState::LoadMetadata(_) => "LoadMetadata", + ParquetOpenState::PrepareFilters(_) => "PrepareFilters", + ParquetOpenState::LoadPageIndex(_) => "LoadPageIndex", + ParquetOpenState::PruneWithStatistics(_) => "PruneWithStatistics", + ParquetOpenState::LoadBloomFilters(_) => "LoadBloomFilters", + ParquetOpenState::PruneWithBloomFilters(_) => "PruneWithBloomFilters", + ParquetOpenState::BuildStream(_) => "BuildStream", + ParquetOpenState::Ready(_) => "Ready", + ParquetOpenState::Done => "Done", + }; + f.write_str(state) + } +} + struct PreparedParquetOpen { partition_index: usize, partitioned_file: PartitionedFile, @@ -290,37 +348,13 @@ struct BloomFiltersLoadedParquetOpen { row_group_bloom_filters: Vec, } -/// Implements state machine described in [`ParquetOpenState`] -struct ParquetOpenFuture { - state: ParquetOpenState, -} - -impl ParquetOpenFuture { - #[cfg(feature = "parquet_encryption")] - fn new(prepared: PreparedParquetOpen, encryption_context: EncryptionContext) -> Self { - Self { - state: ParquetOpenState::Start { - prepared: Box::new(prepared), - encryption_context: Arc::new(encryption_context), - }, - } - } - - #[cfg(not(feature = "parquet_encryption"))] - fn new(prepared: PreparedParquetOpen) -> Self { - Self { - state: ParquetOpenState::Start { - prepared: Box::new(prepared), - }, - } - } -} - impl ParquetOpenState { /// Applies one CPU-only state transition. /// /// `Load*` states do not transition here and are returned unchanged so the /// driver loop can poll their inner futures separately. + /// + /// Implements state machine described in [`ParquetOpenState`] fn transition(self) -> Result { match self { ParquetOpenState::Start { @@ -392,93 +426,258 @@ impl ParquetOpenState { } } +/// Adapter for a [`MorselPlanner`] to the [`FileOpener`] API +/// +/// Implements state machine described in [`ParquetOpenState`] +struct ParquetOpenFuture { + planner: Box, + pending_io: Option>>, + ready_morsels: VecDeque>, +} + +impl ParquetOpenFuture { + fn new( + morselizer: &ParquetMorselizer, + partitioned_file: PartitionedFile, + ) -> Result { + Ok(Self { + planner: morselizer.plan_file(partitioned_file)?, + pending_io: None, + ready_morsels: VecDeque::new(), + }) + } +} + impl Future for ParquetOpenFuture { type Output = Result>>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - let state = mem::replace(&mut self.state, ParquetOpenState::Done); - let mut state = state.transition()?; + // If waiting on IO, poll + if let Some(io_future) = self.pending_io.as_mut() { + ready!(io_future.poll_unpin(cx))?; + self.pending_io = None; + } + + // have a morsel ready to go, return that + if let Some(morsel) = self.ready_morsels.pop_front() { + return Poll::Ready(Ok(morsel.into_stream())); + } + + // Planner did not produce any stream (for example, it pruned the entire file) + let Some(mut plan) = self.planner.plan()? else { + return Poll::Ready(Ok(futures::stream::empty().boxed())); + }; + + let child_planners = plan.take_planners(); + if !child_planners.is_empty() { + return Poll::Ready(internal_err!( + "Parquet FileOpener adapter does not support child morsel planners" + )); + } + + self.ready_morsels = plan.take_morsels().into(); + + if let Some(io_future) = plan.take_io_future() { + self.pending_io = Some(io_future); + } + } + } +} + +/// Implements the Morsel API +struct ParquetStreamMorsel { + stream: BoxStream<'static, Result>, +} + +impl ParquetStreamMorsel { + fn new(stream: BoxStream<'static, Result>) -> Self { + Self { stream } + } +} + +impl fmt::Debug for ParquetStreamMorsel { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ParquetStreamMorsel") + .finish_non_exhaustive() + } +} + +impl Morsel for ParquetStreamMorsel { + fn into_stream(self: Box) -> BoxStream<'static, Result> { + self.stream + } +} + +/// Stateful planner for opening a single parquet file via the morsel APIs. +enum ParquetMorselPlanner { + /// Ready to perform CPU-only planning work. + Ready(ParquetOpenState), + /// Waiting for an I/O future to produce the next planner state. + /// + /// Callers must not call [`MorselPlanner::plan`] again until the + /// corresponding I/O future has completed and its result is ready to + /// receive from the channel. + /// + /// Doing so is a protocol violation and transitions the planner to + /// [`ParquetMorselPlanner::Errored`]. + Waiting(Receiver>), + /// Actively planning (this state should be replaced by end of the call to plan) + Planning, + /// An earlier planning attempt returned an error. + Errored, +} + +impl fmt::Debug for ParquetMorselPlanner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Ready(state) => f + .debug_tuple("ParquetMorselPlanner::Ready") + .field(state) + .finish(), + Self::Waiting(_) => f + .debug_tuple("ParquetMorselPlanner::Waiting") + .field(&"") + .finish(), + Self::Planning => f.debug_tuple("ParquetMorselPlanner::Planning").finish(), + Self::Errored => f.debug_tuple("ParquetMorselPlanner::Errored").finish(), + } + } +} + +impl ParquetMorselPlanner { + fn try_new(morselizer: &ParquetMorselizer, file: PartitionedFile) -> Result { + let prepared = morselizer.prepare_open_file(file)?; + #[cfg(feature = "parquet_encryption")] + let state = ParquetOpenState::Start { + prepared: Box::new(prepared), + encryption_context: Arc::new(morselizer.get_encryption_context()), + }; + #[cfg(not(feature = "parquet_encryption"))] + let state = ParquetOpenState::Start { + prepared: Box::new(prepared), + }; + Ok(Self::Ready(state)) + } + + /// Schedule an I/O future that resolves to the planner's next owned state. + /// + /// This helper + /// + /// 1. creates a channel to send the next [`ParquetOpenState`] back to the + /// planner once the I/O future completes, + /// + /// 2. transitions the planner into [`ParquetMorselPlanner::Waiting`] + /// + /// 3. returns a [`MorselPlan`] containing the boxed I/O future for the + /// caller to poll. + /// + fn schedule_io(&mut self, future: F) -> MorselPlan + where + F: Future> + Send + 'static, + { + let (output_for_future, output) = mpsc::channel(); + let io_future = async move { + let next_state = future.await?; + output_for_future.send(Ok(next_state)).map_err(|e| { + DataFusionError::Execution(format!("failed to send planner output: {e}")) + })?; + Ok(()) + } + .boxed(); + *self = ParquetMorselPlanner::Waiting(output); + MorselPlan::new().with_io_future(io_future) + } +} + +impl MorselPlanner for ParquetMorselPlanner { + fn plan(&mut self) -> Result> { + loop { + let planner = mem::replace(self, ParquetMorselPlanner::Planning); + let state = match planner { + ParquetMorselPlanner::Ready(state) => state, + ParquetMorselPlanner::Waiting(output) => { + output + .try_recv() + .map_err(|e| { + // IO wasn't done + *self = ParquetMorselPlanner::Errored; + match e { + TryRecvError::Empty => internal_datafusion_err!( + "planner polled before I/O completed" + ), + TryRecvError::Disconnected => internal_datafusion_err!( + "planner polled after I/O disconnected" + ), + } + })? + .inspect_err(|_| { + // IO completed successfully, but the IO was an error + *self = ParquetMorselPlanner::Errored; + })? + } + ParquetMorselPlanner::Planning => { + return internal_err!( + "ParquetMorselPlanner::plan was re-entered before previous plan completed" + ); + } + ParquetMorselPlanner::Errored => { + return internal_err!( + "ParquetMorselPlanner::plan called after a previous error" + ); + } + }; + // check for end of stream + if let ParquetOpenState::Done = state { + *self = ParquetMorselPlanner::Ready(ParquetOpenState::Done); + return Ok(None); + }; + + let state = state.transition().inspect_err(|_| { + *self = ParquetMorselPlanner::Errored; + })?; match state { #[cfg(feature = "parquet_encryption")] - ParquetOpenState::LoadEncryption(mut future) => { - state = match future.poll_unpin(cx) { - Poll::Ready(result) => ParquetOpenState::PruneFile(result?), - Poll::Pending => { - self.state = ParquetOpenState::LoadEncryption(future); - return Poll::Pending; - } - }; + ParquetOpenState::LoadEncryption(future) => { + return Ok(Some(self.schedule_io(async move { + Ok(ParquetOpenState::PruneFile(future.await?)) + }))); } - ParquetOpenState::LoadMetadata(mut future) => { - state = match future.poll_unpin(cx) { - Poll::Ready(result) => { - ParquetOpenState::PrepareFilters(Box::new(result?)) - } - Poll::Pending => { - self.state = ParquetOpenState::LoadMetadata(future); - return Poll::Pending; - } - }; + ParquetOpenState::LoadMetadata(future) => { + return Ok(Some(self.schedule_io(async move { + Ok(ParquetOpenState::PrepareFilters(Box::new(future.await?))) + }))); } - ParquetOpenState::LoadPageIndex(mut future) => { - state = match future.poll_unpin(cx) { - Poll::Ready(result) => { - ParquetOpenState::PruneWithStatistics(Box::new(result?)) - } - Poll::Pending => { - self.state = ParquetOpenState::LoadPageIndex(future); - return Poll::Pending; - } - }; + ParquetOpenState::LoadPageIndex(future) => { + return Ok(Some(self.schedule_io(async move { + Ok(ParquetOpenState::PruneWithStatistics(Box::new( + future.await?, + ))) + }))); } - ParquetOpenState::LoadBloomFilters(mut future) => { - state = match future.poll_unpin(cx) { - Poll::Ready(result) => { - ParquetOpenState::PruneWithBloomFilters(Box::new(result?)) - } - Poll::Pending => { - self.state = ParquetOpenState::LoadBloomFilters(future); - return Poll::Pending; - } - }; + ParquetOpenState::LoadBloomFilters(future) => { + return Ok(Some(self.schedule_io(async move { + Ok(ParquetOpenState::PruneWithBloomFilters(Box::new( + future.await?, + ))) + }))); } ParquetOpenState::Ready(stream) => { - return Poll::Ready(Ok(stream)); + let morsels: Vec> = + vec![Box::new(ParquetStreamMorsel::new(stream))]; + return Ok(Some(MorselPlan::new().with_morsels(morsels))); } - ParquetOpenState::Done => { - return Poll::Ready(Ok(futures::stream::empty().boxed())); + ParquetOpenState::Done => return Ok(None), + cpu_state => { + *self = ParquetMorselPlanner::Ready(cpu_state); } - - // For all other states, loop again and try to transition - // immediately. All states are explicitly listed here to ensure any - // new states are handled correctly - ParquetOpenState::Start { .. } => {} - ParquetOpenState::PruneFile(_) => {} - ParquetOpenState::PrepareFilters(_) => {} - ParquetOpenState::PruneWithStatistics(_) => {} - ParquetOpenState::PruneWithBloomFilters(_) => {} - ParquetOpenState::BuildStream(_) => {} - }; - - self.state = state; + } } } } -impl FileOpener for ParquetOpener { - fn open(&self, partitioned_file: PartitionedFile) -> Result { - let prepared = self.prepare_open_file(partitioned_file)?; - #[cfg(feature = "parquet_encryption")] - let future = ParquetOpenFuture::new(prepared, self.get_encryption_context()); - #[cfg(not(feature = "parquet_encryption"))] - let future = ParquetOpenFuture::new(prepared); - Ok(Box::pin(future)) - } -} - -impl ParquetOpener { +impl ParquetMorselizer { /// Perform the CPU-only setup for opening a parquet file. fn prepare_open_file( &self, @@ -1447,7 +1646,7 @@ impl EncryptionContext { } } -impl ParquetOpener { +impl ParquetMorselizer { #[cfg(feature = "parquet_encryption")] fn get_encryption_context(&self) -> EncryptionContext { EncryptionContext::new( @@ -1576,7 +1775,7 @@ fn should_enable_page_index( mod test { use std::sync::Arc; - use super::{ConstantColumns, constant_columns_from_stats}; + use super::{ConstantColumns, ParquetMorselizer, constant_columns_from_stats}; use crate::{DefaultParquetFileReaderFactory, RowGroupAccess, opener::ParquetOpener}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use bytes::{BufMut, BytesMut}; @@ -1731,11 +1930,12 @@ mod test { ProjectionExprs::from_indices(&all_indices, &file_schema) }; - ParquetOpener { + let morselizer = ParquetMorselizer { partition_index: self.partition_index, projection, batch_size: self.batch_size, limit: self.limit, + preserve_order: self.preserve_order, predicate: self.predicate, table_schema, metadata_size_hint: self.metadata_size_hint, @@ -1757,8 +1957,8 @@ mod test { encryption_factory: None, max_predicate_cache_size: self.max_predicate_cache_size, reverse_row_groups: self.reverse_row_groups, - preserve_order: self.preserve_order, - } + }; + ParquetOpener { morselizer } } } diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 64a339009e9cb..30c28507b0c92 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -23,8 +23,8 @@ use std::sync::Arc; use crate::DefaultParquetFileReaderFactory; use crate::ParquetFileReaderFactory; -use crate::opener::ParquetOpener; use crate::opener::build_pruning_predicates; +use crate::opener::{ParquetMorselizer, ParquetOpener}; use crate::row_filter::can_expr_be_pushed_down_with_schemas; use datafusion_common::config::ConfigOptions; #[cfg(feature = "parquet_encryption")] @@ -543,32 +543,34 @@ impl FileSource for ParquetSource { .map(|time_unit| parse_coerce_int96_string(time_unit.as_str()).unwrap()); let opener = Arc::new(ParquetOpener { - partition_index: partition, - projection: self.projection.clone(), - batch_size: self - .batch_size - .expect("Batch size must set before creating ParquetOpener"), - limit: base_config.limit, - preserve_order: base_config.preserve_order, - predicate: self.predicate.clone(), - table_schema: self.table_schema.clone(), - metadata_size_hint: self.metadata_size_hint, - metrics: self.metrics().clone(), - parquet_file_reader_factory, - pushdown_filters: self.pushdown_filters(), - reorder_filters: self.reorder_filters(), - force_filter_selections: self.force_filter_selections(), - enable_page_index: self.enable_page_index(), - enable_bloom_filter: self.bloom_filter_on_read(), - enable_row_group_stats_pruning: self.table_parquet_options.global.pruning, - coerce_int96, - #[cfg(feature = "parquet_encryption")] - file_decryption_properties, - expr_adapter_factory, - #[cfg(feature = "parquet_encryption")] - encryption_factory: self.get_encryption_factory_with_config(), - max_predicate_cache_size: self.max_predicate_cache_size(), - reverse_row_groups: self.reverse_row_groups, + morselizer: ParquetMorselizer { + partition_index: partition, + projection: self.projection.clone(), + batch_size: self + .batch_size + .expect("Batch size must set before creating ParquetOpener"), + limit: base_config.limit, + preserve_order: base_config.preserve_order, + predicate: self.predicate.clone(), + table_schema: self.table_schema.clone(), + metadata_size_hint: self.metadata_size_hint, + metrics: self.metrics().clone(), + parquet_file_reader_factory, + pushdown_filters: self.pushdown_filters(), + reorder_filters: self.reorder_filters(), + force_filter_selections: self.force_filter_selections(), + enable_page_index: self.enable_page_index(), + enable_bloom_filter: self.bloom_filter_on_read(), + enable_row_group_stats_pruning: self.table_parquet_options.global.pruning, + coerce_int96, + #[cfg(feature = "parquet_encryption")] + file_decryption_properties, + expr_adapter_factory, + #[cfg(feature = "parquet_encryption")] + encryption_factory: self.get_encryption_factory_with_config(), + max_predicate_cache_size: self.max_predicate_cache_size(), + reverse_row_groups: self.reverse_row_groups, + }, }); Ok(opener) } diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index bcc4627050d4a..a9600271c28ce 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -38,6 +38,7 @@ pub mod file_scan_config; pub mod file_sink_config; pub mod file_stream; pub mod memory; +pub mod morsel; pub mod projection; pub mod schema_adapter; pub mod sink; diff --git a/datafusion/datasource/src/morsel/mod.rs b/datafusion/datasource/src/morsel/mod.rs new file mode 100644 index 0000000000000..4150bc12b4b22 --- /dev/null +++ b/datafusion/datasource/src/morsel/mod.rs @@ -0,0 +1,174 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Structures for Morsel Driven IO. +//! +//! Morsel Driven IO is a technique for parallelizing the reading of large files +//! by dividing them into smaller "morsels" that are processed independently. +//! +//! It is inspired by the paper [Morsel-Driven Parallelism: A NUMA-Aware Query +//! Evaluation Framework for the Many-Core Age](https://db.in.tum.de/~leis/papers/morsels.pdf). + +use std::fmt::Debug; + +use crate::PartitionedFile; +use arrow::array::RecordBatch; +use datafusion_common::Result; +use futures::future::BoxFuture; +use futures::stream::BoxStream; + +/// A Morsel of work ready to resolve to a stream of [`RecordBatch`]es. +/// +/// This represents a single morsel of work that is ready to be processed. It +/// has all data necessary (does not need any I/O) and is ready to be turned +/// into a stream of [`RecordBatch`]es for processing by the execution engine. +pub trait Morsel: Send + Debug { + /// Consume this morsel and produce a stream of [`RecordBatch`]es for processing. + /// + /// This should not do any I/O work, such as reading from the file. + fn into_stream(self: Box) -> BoxStream<'static, Result>; +} + +/// A Morselizer takes a single [`PartitionedFile`] and creates the initial planner +/// for that file. +/// +/// This is the entry point for morsel driven I/O. +pub trait Morselizer: Send + Sync + Debug { + /// Return the initial [`MorselPlanner`] for this file. + /// + /// "Morselzing" a file may involve CPU work, such as parsing parquet + /// metadata and evaluating pruning predicates. It should NOT do any I/O + /// work, such as reading from the file. If I/O is required, it should + /// return a future that the caller can poll to drive the I/O work to + /// completion, and once the future is complete, the caller can call + /// `plan_file` again for a different file. + fn plan_file(&self, file: PartitionedFile) -> Result>; +} + +/// A Morsel Planner is responsible for creating morsels for a given scan. +/// +/// The [`MorselPlanner`] is the unit of I/O. There is only ever a single I/O +/// outstanding for a specific planner. DataFusion may run +/// multiple planners in parallel, which corresponds to multiple parallel +/// I/O requests. +/// +/// It is not a Rust `Stream` so that it can explicitly separate CPU bound +/// work from I/O work. +/// +/// The design is similar to `ParquetPushDecoder`: when `plan` is called, it +/// should do CPU work to produce the next morsels or discover the next I/O +/// phase. +/// +/// Best practice is to spawn I/O in a Tokio task on a separate runtime to +/// ensure that CPU work doesn't block or slow down I/O work, but this is not +/// strictly required by the API. +pub trait MorselPlanner: Send + Debug { + /// Attempt to plan morsels. This may involve CPU work, such as parsing + /// parquet metadata and evaluating pruning predicates. + /// + /// It should NOT do any I/O work, such as reading from the file. If I/O is + /// required, the returned [`MorselPlan`] should contain a future that the + /// caller polls to drive the I/O work to completion. Once the future is + /// complete, the caller can call `plan` again to get the next morsels. + /// + /// Note this function is **not async** to make it explicitly clear that if + /// I/O is required, it should be done in the returned `io_future`. + /// + /// Returns `None` if the planner has no more work to do. + /// + /// # Empty Morsel Plans + /// + /// It may return `None`, which means no batches will be read from the file + /// (e.g. due to late-pruning based on statistics). + /// + /// # Output Ordering + /// + /// See the comments on [`MorselPlan`] for the logical output order. + fn plan(&mut self) -> Result>; +} + +/// Return result of [`MorselPlanner::plan`]. +/// +/// # Logical Ordering +/// +/// For plans where the output order of rows is maintained, the output order of +/// a [`MorselPlanner`] is logically defined as follows: +/// 1. All morsels that are directly produced +/// 2. Recursively, all morsels produced by the returned `planners` +#[derive(Default)] +pub struct MorselPlan { + /// Any morsels that are ready for processing. + morsels: Vec>, + /// Any newly-created planners that are ready for CPU work. + planners: Vec>, + /// A future that will drive any I/O work to completion. + /// + /// DataFusion will poll this future occasionally to drive the I/O work to + /// completion. Once the future resolves, DataFusion will call `plan` again + /// to get the next morsels. + io_future: Option>>, +} + +impl MorselPlan { + /// Create an empty morsel plan. + pub fn new() -> Self { + Self::default() + } + + /// Set the ready morsels. + pub fn with_morsels(mut self, morsels: Vec>) -> Self { + self.morsels = morsels; + self + } + + /// Set the ready child planners. + pub fn with_planners(mut self, planners: Vec>) -> Self { + self.planners = planners; + self + } + + /// Set the pending I/O future. + pub fn with_io_future(mut self, io_future: BoxFuture<'static, Result<()>>) -> Self { + self.io_future = Some(io_future); + self + } + + /// Take the ready morsels. + pub fn take_morsels(&mut self) -> Vec> { + std::mem::take(&mut self.morsels) + } + + /// Take the ready child planners. + pub fn take_planners(&mut self) -> Vec> { + std::mem::take(&mut self.planners) + } + + /// Take the pending I/O future, if any. + pub fn take_io_future(&mut self) -> Option>> { + self.io_future.take() + } + + /// Set the pending I/O future. + pub fn set_io_future(&mut self, io_future: BoxFuture<'static, Result<()>>) { + self.io_future = Some(io_future); + } + + /// Returns `true` if this plan contains an I/O future. + pub fn has_io_future(&self) -> bool { + self.io_future.is_some() + } +} From f843f75462c59726f2f0aa1b456e9337fbac2e02 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 8 Apr 2026 08:42:50 -0400 Subject: [PATCH 2/9] Review feedback: comments, use oneshot channel, wrapper for morsel --- datafusion/datasource-parquet/src/opener.rs | 38 +++++++++---------- datafusion/datasource/src/morsel/mod.rs | 42 +++++++++++++++++---- 2 files changed, 53 insertions(+), 27 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index dcecce51286c3..c916acffb140c 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -29,7 +29,9 @@ use arrow::datatypes::DataType; use datafusion_common::internal_datafusion_err; use datafusion_common::internal_err; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; -use datafusion_datasource::morsel::{Morsel, MorselPlan, MorselPlanner, Morselizer}; +use datafusion_datasource::morsel::{ + Morsel, MorselPlan, MorselPlanner, Morselizer, PendingMorselizationIO, +}; use datafusion_physical_expr::projection::{ProjectionExprs, Projector}; use datafusion_physical_expr::utils::reassign_expr_columns; use datafusion_physical_expr_adapter::replace_columns_with_literals; @@ -39,8 +41,6 @@ use std::future::Future; use std::mem; use std::pin::Pin; use std::sync::Arc; -use std::sync::mpsc; -use std::sync::mpsc::{Receiver, TryRecvError}; use std::task::{Context, Poll}; use arrow::datatypes::{Schema, SchemaRef, TimeUnit}; @@ -65,6 +65,7 @@ use datafusion_pruning::{FilePruner, PruningPredicate, build_pruning_predicate}; use datafusion_common::config::EncryptionFactoryOptions; #[cfg(feature = "parquet_encryption")] use datafusion_execution::parquet_encryption::EncryptionFactory; +use futures::channel::oneshot; use futures::{ FutureExt, Stream, StreamExt, future::BoxFuture, ready, stream::BoxStream, }; @@ -431,7 +432,7 @@ impl ParquetOpenState { /// Implements state machine described in [`ParquetOpenState`] struct ParquetOpenFuture { planner: Box, - pending_io: Option>>, + pending_io: Option, ready_morsels: VecDeque>, } @@ -521,7 +522,7 @@ enum ParquetMorselPlanner { /// /// Doing so is a protocol violation and transitions the planner to /// [`ParquetMorselPlanner::Errored`]. - Waiting(Receiver>), + Waiting(oneshot::Receiver>), /// Actively planning (this state should be replaced by end of the call to plan) Planning, /// An earlier planning attempt returned an error. @@ -576,12 +577,11 @@ impl ParquetMorselPlanner { where F: Future> + Send + 'static, { - let (output_for_future, output) = mpsc::channel(); + let (output_for_future, output) = oneshot::channel(); let io_future = async move { let next_state = future.await?; - output_for_future.send(Ok(next_state)).map_err(|e| { - DataFusionError::Execution(format!("failed to send planner output: {e}")) - })?; + // Ignore if receiver has been dropped (e.g. due to plan cancel) + let _ = output_for_future.send(Ok(next_state)); Ok(()) } .boxed(); @@ -596,21 +596,19 @@ impl MorselPlanner for ParquetMorselPlanner { let planner = mem::replace(self, ParquetMorselPlanner::Planning); let state = match planner { ParquetMorselPlanner::Ready(state) => state, - ParquetMorselPlanner::Waiting(output) => { + ParquetMorselPlanner::Waiting(mut output) => { output .try_recv() - .map_err(|e| { - // IO wasn't done + .map_err(|_: oneshot::Canceled| { *self = ParquetMorselPlanner::Errored; - match e { - TryRecvError::Empty => internal_datafusion_err!( - "planner polled before I/O completed" - ), - TryRecvError::Disconnected => internal_datafusion_err!( - "planner polled after I/O disconnected" - ), - } + internal_datafusion_err!( + "planner polled after I/O disconnected" + ) })? + .unwrap_or_else(|| { + *self = ParquetMorselPlanner::Errored; + internal_err!("planner polled before I/O completed") + }) .inspect_err(|_| { // IO completed successfully, but the IO was an error *self = ParquetMorselPlanner::Errored; diff --git a/datafusion/datasource/src/morsel/mod.rs b/datafusion/datasource/src/morsel/mod.rs index 4150bc12b4b22..9a2491e95361e 100644 --- a/datafusion/datasource/src/morsel/mod.rs +++ b/datafusion/datasource/src/morsel/mod.rs @@ -23,13 +23,14 @@ //! It is inspired by the paper [Morsel-Driven Parallelism: A NUMA-Aware Query //! Evaluation Framework for the Many-Core Age](https://db.in.tum.de/~leis/papers/morsels.pdf). -use std::fmt::Debug; - use crate::PartitionedFile; use arrow::array::RecordBatch; use datafusion_common::Result; use futures::future::BoxFuture; use futures::stream::BoxStream; +use std::fmt::Debug; +use std::pin::Pin; +use std::task::{Context, Poll}; /// A Morsel of work ready to resolve to a stream of [`RecordBatch`]es. /// @@ -39,10 +40,37 @@ use futures::stream::BoxStream; pub trait Morsel: Send + Debug { /// Consume this morsel and produce a stream of [`RecordBatch`]es for processing. /// - /// This should not do any I/O work, such as reading from the file. + /// Note: This may do CPU work to decode already-loaded data, but should not + /// do any I/O work such as reading from the file. fn into_stream(self: Box) -> BoxStream<'static, Result>; } +/// Wrapper for I/O that must complete before planning can continue. +pub struct PendingMorselizationIO { + future: BoxFuture<'static, Result<()>>, +} + +impl PendingMorselizationIO { + /// Create a new pending morselization I/O future. + pub fn new(future: BoxFuture<'static, Result<()>>) -> Self { + Self { future } + } + + /// Consume this wrapper and return the underlying future. + pub fn into_future(self) -> BoxFuture<'static, Result<()>> { + self.future + } +} + +/// PendingMorselizationIO wraps the inner future +impl Future for PendingMorselizationIO { + type Output = Result<()>; + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // forward request to inner + self.future.as_mut().poll(cx) + } +} + /// A Morselizer takes a single [`PartitionedFile`] and creates the initial planner /// for that file. /// @@ -120,7 +148,7 @@ pub struct MorselPlan { /// DataFusion will poll this future occasionally to drive the I/O work to /// completion. Once the future resolves, DataFusion will call `plan` again /// to get the next morsels. - io_future: Option>>, + io_future: Option, } impl MorselPlan { @@ -143,7 +171,7 @@ impl MorselPlan { /// Set the pending I/O future. pub fn with_io_future(mut self, io_future: BoxFuture<'static, Result<()>>) -> Self { - self.io_future = Some(io_future); + self.io_future = Some(PendingMorselizationIO::new(io_future)); self } @@ -158,13 +186,13 @@ impl MorselPlan { } /// Take the pending I/O future, if any. - pub fn take_io_future(&mut self) -> Option>> { + pub fn take_io_future(&mut self) -> Option { self.io_future.take() } /// Set the pending I/O future. pub fn set_io_future(&mut self, io_future: BoxFuture<'static, Result<()>>) { - self.io_future = Some(io_future); + self.io_future = Some(PendingMorselizationIO::new(io_future)); } /// Returns `true` if this plan contains an I/O future. From 5a22e2daac15a6f5f1a60893681aff0828d21513 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 8 Apr 2026 08:51:58 -0400 Subject: [PATCH 3/9] Clean up future creation --- datafusion/datasource-parquet/src/opener.rs | 3 +- datafusion/datasource/src/morsel/mod.rs | 32 +++++++++++++++++---- 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index c916acffb140c..3acbf6a9d52c1 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -583,8 +583,7 @@ impl ParquetMorselPlanner { // Ignore if receiver has been dropped (e.g. due to plan cancel) let _ = output_for_future.send(Ok(next_state)); Ok(()) - } - .boxed(); + }; *self = ParquetMorselPlanner::Waiting(output); MorselPlan::new().with_io_future(io_future) } diff --git a/datafusion/datasource/src/morsel/mod.rs b/datafusion/datasource/src/morsel/mod.rs index 9a2491e95361e..31b0e5d581536 100644 --- a/datafusion/datasource/src/morsel/mod.rs +++ b/datafusion/datasource/src/morsel/mod.rs @@ -26,6 +26,7 @@ use crate::PartitionedFile; use arrow::array::RecordBatch; use datafusion_common::Result; +use futures::FutureExt; use futures::future::BoxFuture; use futures::stream::BoxStream; use std::fmt::Debug; @@ -51,9 +52,24 @@ pub struct PendingMorselizationIO { } impl PendingMorselizationIO { - /// Create a new pending morselization I/O future. - pub fn new(future: BoxFuture<'static, Result<()>>) -> Self { - Self { future } + /// Create a new pending morselization I/O future + /// + /// Example + /// ``` + /// # use datafusion_datasource::morsel::PendingMorselizationIO; + /// let work = async move { + /// // Do I/O work here + /// Ok(()) + /// }; + /// let pending_io = PendingMorselizationIO::new(work); + /// ``` + pub fn new(future: F) -> Self + where + F: Future> + Send + 'static, + { + Self { + future: future.boxed(), + } } /// Consume this wrapper and return the underlying future. @@ -170,7 +186,10 @@ impl MorselPlan { } /// Set the pending I/O future. - pub fn with_io_future(mut self, io_future: BoxFuture<'static, Result<()>>) -> Self { + pub fn with_io_future(mut self, io_future: F) -> Self + where + F: Future> + Send + 'static, + { self.io_future = Some(PendingMorselizationIO::new(io_future)); self } @@ -191,7 +210,10 @@ impl MorselPlan { } /// Set the pending I/O future. - pub fn set_io_future(&mut self, io_future: BoxFuture<'static, Result<()>>) { + pub fn set_io_future(&mut self, io_future: F) + where + F: Future> + Send + 'static, + { self.io_future = Some(PendingMorselizationIO::new(io_future)); } From 454989bfb21302d8018f3f20ef775b29e396baf2 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 8 Apr 2026 09:10:05 -0400 Subject: [PATCH 4/9] Encode planning IO in type system --- datafusion/datasource-parquet/src/opener.rs | 75 +++--------- datafusion/datasource/src/morsel/mod.rs | 124 ++++++++++---------- 2 files changed, 80 insertions(+), 119 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 3acbf6a9d52c1..fc68d93386019 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -26,11 +26,10 @@ use crate::{ }; use arrow::array::{RecordBatch, RecordBatchOptions}; use arrow::datatypes::DataType; -use datafusion_common::internal_datafusion_err; use datafusion_common::internal_err; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_datasource::morsel::{ - Morsel, MorselPlan, MorselPlanner, Morselizer, PendingMorselizationIO, + Morsel, MorselPlan, MorselPlanner, Morselizer, PendingMorselPlanner, }; use datafusion_physical_expr::projection::{ProjectionExprs, Projector}; use datafusion_physical_expr::utils::reassign_expr_columns; @@ -65,7 +64,6 @@ use datafusion_pruning::{FilePruner, PruningPredicate, build_pruning_predicate}; use datafusion_common::config::EncryptionFactoryOptions; #[cfg(feature = "parquet_encryption")] use datafusion_execution::parquet_encryption::EncryptionFactory; -use futures::channel::oneshot; use futures::{ FutureExt, Stream, StreamExt, future::BoxFuture, ready, stream::BoxStream, }; @@ -432,7 +430,7 @@ impl ParquetOpenState { /// Implements state machine described in [`ParquetOpenState`] struct ParquetOpenFuture { planner: Box, - pending_io: Option, + pending_io: Option, ready_morsels: VecDeque>, } @@ -456,7 +454,8 @@ impl Future for ParquetOpenFuture { loop { // If waiting on IO, poll if let Some(io_future) = self.pending_io.as_mut() { - ready!(io_future.poll_unpin(cx))?; + let planner = ready!(io_future.poll_unpin(cx))?; + self.planner = planner; self.pending_io = None; } @@ -470,7 +469,7 @@ impl Future for ParquetOpenFuture { return Poll::Ready(Ok(futures::stream::empty().boxed())); }; - let child_planners = plan.take_planners(); + let child_planners = plan.take_ready_planners(); if !child_planners.is_empty() { return Poll::Ready(internal_err!( "Parquet FileOpener adapter does not support child morsel planners" @@ -479,7 +478,7 @@ impl Future for ParquetOpenFuture { self.ready_morsels = plan.take_morsels().into(); - if let Some(io_future) = plan.take_io_future() { + if let Some(io_future) = plan.take_pending_planner() { self.pending_io = Some(io_future); } } @@ -514,15 +513,6 @@ impl Morsel for ParquetStreamMorsel { enum ParquetMorselPlanner { /// Ready to perform CPU-only planning work. Ready(ParquetOpenState), - /// Waiting for an I/O future to produce the next planner state. - /// - /// Callers must not call [`MorselPlanner::plan`] again until the - /// corresponding I/O future has completed and its result is ready to - /// receive from the channel. - /// - /// Doing so is a protocol violation and transitions the planner to - /// [`ParquetMorselPlanner::Errored`]. - Waiting(oneshot::Receiver>), /// Actively planning (this state should be replaced by end of the call to plan) Planning, /// An earlier planning attempt returned an error. @@ -536,10 +526,6 @@ impl fmt::Debug for ParquetMorselPlanner { .debug_tuple("ParquetMorselPlanner::Ready") .field(state) .finish(), - Self::Waiting(_) => f - .debug_tuple("ParquetMorselPlanner::Waiting") - .field(&"") - .finish(), Self::Planning => f.debug_tuple("ParquetMorselPlanner::Planning").finish(), Self::Errored => f.debug_tuple("ParquetMorselPlanner::Errored").finish(), } @@ -561,31 +547,24 @@ impl ParquetMorselPlanner { Ok(Self::Ready(state)) } - /// Schedule an I/O future that resolves to the planner's next owned state. + /// Schedule an I/O future that resolves to the next planner to run. /// /// This helper /// - /// 1. creates a channel to send the next [`ParquetOpenState`] back to the - /// planner once the I/O future completes, + /// 1. drives one I/O phase to completion + /// 2. wraps the resulting state in a new [`ParquetMorselPlanner`] + /// 3. returns a [`MorselPlan`] containing the boxed future for the caller + /// to poll /// - /// 2. transitions the planner into [`ParquetMorselPlanner::Waiting`] - /// - /// 3. returns a [`MorselPlan`] containing the boxed I/O future for the - /// caller to poll. - /// - fn schedule_io(&mut self, future: F) -> MorselPlan + fn schedule_io(future: F) -> MorselPlan where F: Future> + Send + 'static, { - let (output_for_future, output) = oneshot::channel(); let io_future = async move { let next_state = future.await?; - // Ignore if receiver has been dropped (e.g. due to plan cancel) - let _ = output_for_future.send(Ok(next_state)); - Ok(()) + Ok(Box::new(ParquetMorselPlanner::Ready(next_state)) as _) }; - *self = ParquetMorselPlanner::Waiting(output); - MorselPlan::new().with_io_future(io_future) + MorselPlan::new().with_pending_planner(io_future) } } @@ -595,24 +574,6 @@ impl MorselPlanner for ParquetMorselPlanner { let planner = mem::replace(self, ParquetMorselPlanner::Planning); let state = match planner { ParquetMorselPlanner::Ready(state) => state, - ParquetMorselPlanner::Waiting(mut output) => { - output - .try_recv() - .map_err(|_: oneshot::Canceled| { - *self = ParquetMorselPlanner::Errored; - internal_datafusion_err!( - "planner polled after I/O disconnected" - ) - })? - .unwrap_or_else(|| { - *self = ParquetMorselPlanner::Errored; - internal_err!("planner polled before I/O completed") - }) - .inspect_err(|_| { - // IO completed successfully, but the IO was an error - *self = ParquetMorselPlanner::Errored; - })? - } ParquetMorselPlanner::Planning => { return internal_err!( "ParquetMorselPlanner::plan was re-entered before previous plan completed" @@ -637,24 +598,24 @@ impl MorselPlanner for ParquetMorselPlanner { match state { #[cfg(feature = "parquet_encryption")] ParquetOpenState::LoadEncryption(future) => { - return Ok(Some(self.schedule_io(async move { + return Ok(Some(Self::schedule_io(async move { Ok(ParquetOpenState::PruneFile(future.await?)) }))); } ParquetOpenState::LoadMetadata(future) => { - return Ok(Some(self.schedule_io(async move { + return Ok(Some(Self::schedule_io(async move { Ok(ParquetOpenState::PrepareFilters(Box::new(future.await?))) }))); } ParquetOpenState::LoadPageIndex(future) => { - return Ok(Some(self.schedule_io(async move { + return Ok(Some(Self::schedule_io(async move { Ok(ParquetOpenState::PruneWithStatistics(Box::new( future.await?, ))) }))); } ParquetOpenState::LoadBloomFilters(future) => { - return Ok(Some(self.schedule_io(async move { + return Ok(Some(Self::schedule_io(async move { Ok(ParquetOpenState::PruneWithBloomFilters(Box::new( future.await?, ))) diff --git a/datafusion/datasource/src/morsel/mod.rs b/datafusion/datasource/src/morsel/mod.rs index 31b0e5d581536..ad0e060da1623 100644 --- a/datafusion/datasource/src/morsel/mod.rs +++ b/datafusion/datasource/src/morsel/mod.rs @@ -46,47 +46,6 @@ pub trait Morsel: Send + Debug { fn into_stream(self: Box) -> BoxStream<'static, Result>; } -/// Wrapper for I/O that must complete before planning can continue. -pub struct PendingMorselizationIO { - future: BoxFuture<'static, Result<()>>, -} - -impl PendingMorselizationIO { - /// Create a new pending morselization I/O future - /// - /// Example - /// ``` - /// # use datafusion_datasource::morsel::PendingMorselizationIO; - /// let work = async move { - /// // Do I/O work here - /// Ok(()) - /// }; - /// let pending_io = PendingMorselizationIO::new(work); - /// ``` - pub fn new(future: F) -> Self - where - F: Future> + Send + 'static, - { - Self { - future: future.boxed(), - } - } - - /// Consume this wrapper and return the underlying future. - pub fn into_future(self) -> BoxFuture<'static, Result<()>> { - self.future - } -} - -/// PendingMorselizationIO wraps the inner future -impl Future for PendingMorselizationIO { - type Output = Result<()>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // forward request to inner - self.future.as_mut().poll(cx) - } -} - /// A Morselizer takes a single [`PartitionedFile`] and creates the initial planner /// for that file. /// @@ -155,16 +114,16 @@ pub trait MorselPlanner: Send + Debug { /// 2. Recursively, all morsels produced by the returned `planners` #[derive(Default)] pub struct MorselPlan { - /// Any morsels that are ready for processing. + /// Morsels ready for CPU work morsels: Vec>, - /// Any newly-created planners that are ready for CPU work. - planners: Vec>, - /// A future that will drive any I/O work to completion. + /// Planners that are ready for CPU work. + ready_planners: Vec>, + /// A future that is doing IO that will resolve to MorselPlanner /// /// DataFusion will poll this future occasionally to drive the I/O work to /// completion. Once the future resolves, DataFusion will call `plan` again /// to get the next morsels. - io_future: Option, + pending_planner: Option, } impl MorselPlan { @@ -181,44 +140,85 @@ impl MorselPlan { /// Set the ready child planners. pub fn with_planners(mut self, planners: Vec>) -> Self { - self.planners = planners; + self.ready_planners = planners; self } - /// Set the pending I/O future. - pub fn with_io_future(mut self, io_future: F) -> Self + /// Set the pending future for planning + pub fn with_pending_planner(mut self, io_future: F) -> Self where - F: Future> + Send + 'static, + F: Future>> + Send + 'static, { - self.io_future = Some(PendingMorselizationIO::new(io_future)); + self.pending_planner = Some(PendingMorselPlanner::new(io_future)); self } + /// Set the pending future for planning + pub fn set_pending_planner(&mut self, io_future: F) + where + F: Future>> + Send + 'static, + { + self.pending_planner = Some(PendingMorselPlanner::new(io_future)); + } + /// Take the ready morsels. pub fn take_morsels(&mut self) -> Vec> { std::mem::take(&mut self.morsels) } /// Take the ready child planners. - pub fn take_planners(&mut self) -> Vec> { - std::mem::take(&mut self.planners) + pub fn take_ready_planners(&mut self) -> Vec> { + std::mem::take(&mut self.ready_planners) } /// Take the pending I/O future, if any. - pub fn take_io_future(&mut self) -> Option { - self.io_future.take() + pub fn take_pending_planner(&mut self) -> Option { + self.pending_planner.take() + } + + /// Returns `true` if this plan contains an I/O future. + pub fn has_io_future(&self) -> bool { + self.pending_planner.is_some() } +} + +/// Wrapper for I/O that must complete before planning can continue. +pub struct PendingMorselPlanner { + future: BoxFuture<'static, Result>>, +} - /// Set the pending I/O future. - pub fn set_io_future(&mut self, io_future: F) +impl PendingMorselPlanner { + /// Create a new pending morselization I/O future + /// + /// Example + /// ``` + /// # use datafusion_datasource::morsel::PendingMorselPlanner; + /// let work = async move { + /// // Do I/O work here + /// # unimplemented!() + /// }; + /// let pending_io = PendingMorselPlanner::new(work); + /// ``` + pub fn new(future: F) -> Self where - F: Future> + Send + 'static, + F: Future>> + Send + 'static, { - self.io_future = Some(PendingMorselizationIO::new(io_future)); + Self { + future: future.boxed(), + } } - /// Returns `true` if this plan contains an I/O future. - pub fn has_io_future(&self) -> bool { - self.io_future.is_some() + /// Consume this wrapper and return the underlying future. + pub fn into_future(self) -> BoxFuture<'static, Result>> { + self.future + } +} + +/// wraps the inner future +impl Future for PendingMorselPlanner { + type Output = Result>; + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // forward request to inner + self.future.as_mut().poll(cx) } } From 90c3e689fe47d4146f2551f283ba3138e431174a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 8 Apr 2026 09:22:15 -0400 Subject: [PATCH 5/9] Take Box --- datafusion/datasource-parquet/src/opener.rs | 137 +++++++++----------- datafusion/datasource/src/morsel/mod.rs | 2 +- 2 files changed, 65 insertions(+), 74 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index fc68d93386019..615fd92a1a6c4 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -429,7 +429,7 @@ impl ParquetOpenState { /// /// Implements state machine described in [`ParquetOpenState`] struct ParquetOpenFuture { - planner: Box, + planner: Option>, pending_io: Option, ready_morsels: VecDeque>, } @@ -440,7 +440,7 @@ impl ParquetOpenFuture { partitioned_file: PartitionedFile, ) -> Result { Ok(Self { - planner: morselizer.plan_file(partitioned_file)?, + planner: Some(morselizer.plan_file(partitioned_file)?), pending_io: None, ready_morsels: VecDeque::new(), }) @@ -454,9 +454,13 @@ impl Future for ParquetOpenFuture { loop { // If waiting on IO, poll if let Some(io_future) = self.pending_io.as_mut() { - let planner = ready!(io_future.poll_unpin(cx))?; - self.planner = planner; + let maybe_planner = ready!(io_future.poll_unpin(cx)); + // future has resolved. Clear pending io before processing the + // result to ensure that if the future returns an error, we + // don't end up in a state where both are set and accidentally + // ignore the error on the next poll self.pending_io = None; + self.planner = Some(maybe_planner?); } // have a morsel ready to go, return that @@ -464,17 +468,24 @@ impl Future for ParquetOpenFuture { return Poll::Ready(Ok(morsel.into_stream())); } - // Planner did not produce any stream (for example, it pruned the entire file) - let Some(mut plan) = self.planner.plan()? else { + let Some(planner) = self.planner.take() else { + // any path that leave planner as non + return Poll::Ready(internal_err!( + "ParquetOpenFuture polled after completion" + )); + }; + + let Some(mut plan) = planner.plan()? else { return Poll::Ready(Ok(futures::stream::empty().boxed())); }; - let child_planners = plan.take_ready_planners(); - if !child_planners.is_empty() { + let mut child_planners = plan.take_ready_planners(); + if child_planners.len() > 1 { return Poll::Ready(internal_err!( "Parquet FileOpener adapter does not support child morsel planners" )); } + self.planner = child_planners.pop(); self.ready_morsels = plan.take_morsels().into(); @@ -513,10 +524,6 @@ impl Morsel for ParquetStreamMorsel { enum ParquetMorselPlanner { /// Ready to perform CPU-only planning work. Ready(ParquetOpenState), - /// Actively planning (this state should be replaced by end of the call to plan) - Planning, - /// An earlier planning attempt returned an error. - Errored, } impl fmt::Debug for ParquetMorselPlanner { @@ -526,8 +533,6 @@ impl fmt::Debug for ParquetMorselPlanner { .debug_tuple("ParquetMorselPlanner::Ready") .field(state) .finish(), - Self::Planning => f.debug_tuple("ParquetMorselPlanner::Planning").finish(), - Self::Errored => f.debug_tuple("ParquetMorselPlanner::Errored").finish(), } } } @@ -569,67 +574,53 @@ impl ParquetMorselPlanner { } impl MorselPlanner for ParquetMorselPlanner { - fn plan(&mut self) -> Result> { - loop { - let planner = mem::replace(self, ParquetMorselPlanner::Planning); - let state = match planner { - ParquetMorselPlanner::Ready(state) => state, - ParquetMorselPlanner::Planning => { - return internal_err!( - "ParquetMorselPlanner::plan was re-entered before previous plan completed" - ); - } - ParquetMorselPlanner::Errored => { - return internal_err!( - "ParquetMorselPlanner::plan called after a previous error" - ); - } - }; - // check for end of stream - if let ParquetOpenState::Done = state { - *self = ParquetMorselPlanner::Ready(ParquetOpenState::Done); - return Ok(None); - }; + fn plan(self: Box) -> Result> { + let state = match *self { + ParquetMorselPlanner::Ready(state) => state, + }; - let state = state.transition().inspect_err(|_| { - *self = ParquetMorselPlanner::Errored; - })?; + if let ParquetOpenState::Done = state { + return Ok(None); + } - match state { - #[cfg(feature = "parquet_encryption")] - ParquetOpenState::LoadEncryption(future) => { - return Ok(Some(Self::schedule_io(async move { - Ok(ParquetOpenState::PruneFile(future.await?)) - }))); - } - ParquetOpenState::LoadMetadata(future) => { - return Ok(Some(Self::schedule_io(async move { - Ok(ParquetOpenState::PrepareFilters(Box::new(future.await?))) - }))); - } - ParquetOpenState::LoadPageIndex(future) => { - return Ok(Some(Self::schedule_io(async move { - Ok(ParquetOpenState::PruneWithStatistics(Box::new( - future.await?, - ))) - }))); - } - ParquetOpenState::LoadBloomFilters(future) => { - return Ok(Some(Self::schedule_io(async move { - Ok(ParquetOpenState::PruneWithBloomFilters(Box::new( - future.await?, - ))) - }))); - } - ParquetOpenState::Ready(stream) => { - let morsels: Vec> = - vec![Box::new(ParquetStreamMorsel::new(stream))]; - return Ok(Some(MorselPlan::new().with_morsels(morsels))); - } - ParquetOpenState::Done => return Ok(None), - cpu_state => { - *self = ParquetMorselPlanner::Ready(cpu_state); - } + let state = state.transition()?; + + match state { + #[cfg(feature = "parquet_encryption")] + ParquetOpenState::LoadEncryption(future) => { + Ok(Some(Self::schedule_io(async move { + Ok(ParquetOpenState::PruneFile(future.await?)) + }))) + } + ParquetOpenState::LoadMetadata(future) => { + Ok(Some(Self::schedule_io(async move { + Ok(ParquetOpenState::PrepareFilters(Box::new(future.await?))) + }))) + } + ParquetOpenState::LoadPageIndex(future) => { + Ok(Some(Self::schedule_io(async move { + Ok(ParquetOpenState::PruneWithStatistics(Box::new( + future.await?, + ))) + }))) + } + ParquetOpenState::LoadBloomFilters(future) => { + Ok(Some(Self::schedule_io(async move { + Ok(ParquetOpenState::PruneWithBloomFilters(Box::new( + future.await?, + ))) + }))) + } + ParquetOpenState::Ready(stream) => { + let morsels: Vec> = + vec![Box::new(ParquetStreamMorsel::new(stream))]; + Ok(Some(MorselPlan::new().with_morsels(morsels))) + } + ParquetOpenState::Done => Ok(None), + cpu_state => { + Ok(Some(MorselPlan::new().with_planners(vec![Box::new( + ParquetMorselPlanner::Ready(cpu_state), + )]))) } } } diff --git a/datafusion/datasource/src/morsel/mod.rs b/datafusion/datasource/src/morsel/mod.rs index ad0e060da1623..9a2e2a8e95328 100644 --- a/datafusion/datasource/src/morsel/mod.rs +++ b/datafusion/datasource/src/morsel/mod.rs @@ -101,7 +101,7 @@ pub trait MorselPlanner: Send + Debug { /// # Output Ordering /// /// See the comments on [`MorselPlan`] for the logical output order. - fn plan(&mut self) -> Result>; + fn plan(self: Box) -> Result>; } /// Return result of [`MorselPlanner::plan`]. From 9264ec1b230407b2680fb5eb89042725e9b00de5 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 8 Apr 2026 09:25:53 -0400 Subject: [PATCH 6/9] Clippy --- datafusion/datasource-parquet/src/opener.rs | 36 ++++++++------------- 1 file changed, 14 insertions(+), 22 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 615fd92a1a6c4..03aeae9407535 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -520,20 +520,17 @@ impl Morsel for ParquetStreamMorsel { } } -/// Stateful planner for opening a single parquet file via the morsel APIs. -enum ParquetMorselPlanner { +/// Planner for opening a single parquet file via the morsel APIs. +struct ParquetMorselPlanner { /// Ready to perform CPU-only planning work. - Ready(ParquetOpenState), + state: ParquetOpenState, } impl fmt::Debug for ParquetMorselPlanner { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::Ready(state) => f - .debug_tuple("ParquetMorselPlanner::Ready") - .field(state) - .finish(), - } + f.debug_tuple("ParquetMorselPlanner::Ready") + .field(&self.state) + .finish() } } @@ -549,7 +546,7 @@ impl ParquetMorselPlanner { let state = ParquetOpenState::Start { prepared: Box::new(prepared), }; - Ok(Self::Ready(state)) + Ok(Self { state }) } /// Schedule an I/O future that resolves to the next planner to run. @@ -567,7 +564,7 @@ impl ParquetMorselPlanner { { let io_future = async move { let next_state = future.await?; - Ok(Box::new(ParquetMorselPlanner::Ready(next_state)) as _) + Ok(Box::new(ParquetMorselPlanner { state: next_state }) as _) }; MorselPlan::new().with_pending_planner(io_future) } @@ -575,15 +572,11 @@ impl ParquetMorselPlanner { impl MorselPlanner for ParquetMorselPlanner { fn plan(self: Box) -> Result> { - let state = match *self { - ParquetMorselPlanner::Ready(state) => state, - }; - - if let ParquetOpenState::Done = state { + if let ParquetOpenState::Done = self.state { return Ok(None); } - let state = state.transition()?; + let state = self.state.transition()?; match state { #[cfg(feature = "parquet_encryption")] @@ -617,11 +610,10 @@ impl MorselPlanner for ParquetMorselPlanner { Ok(Some(MorselPlan::new().with_morsels(morsels))) } ParquetOpenState::Done => Ok(None), - cpu_state => { - Ok(Some(MorselPlan::new().with_planners(vec![Box::new( - ParquetMorselPlanner::Ready(cpu_state), - )]))) - } + cpu_state => Ok(Some( + MorselPlan::new() + .with_planners(vec![Box::new(Self { state: cpu_state })]), + )), } } } From d182ec176c914171ad6133520509addee963c5d3 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 8 Apr 2026 09:34:48 -0400 Subject: [PATCH 7/9] update comments --- datafusion/datasource-parquet/src/opener.rs | 21 +++++++----- datafusion/datasource/src/morsel/mod.rs | 38 +++++++++++---------- 2 files changed, 32 insertions(+), 27 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 03aeae9407535..a1de1ef0fdfee 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -427,7 +427,8 @@ impl ParquetOpenState { /// Adapter for a [`MorselPlanner`] to the [`FileOpener`] API /// -/// Implements state machine described in [`ParquetOpenState`] +/// Compatibility adapter that drives a morsel planner through the +/// [`FileOpener`] API. struct ParquetOpenFuture { planner: Option>, pending_io: Option, @@ -452,29 +453,31 @@ impl Future for ParquetOpenFuture { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - // If waiting on IO, poll + // If planner I/O completed, resume with the returned planner. if let Some(io_future) = self.pending_io.as_mut() { let maybe_planner = ready!(io_future.poll_unpin(cx)); - // future has resolved. Clear pending io before processing the - // result to ensure that if the future returns an error, we - // don't end up in a state where both are set and accidentally - // ignore the error on the next poll + // Clear `pending_io` before handling the result so an error + // cannot leave both continuation paths populated. self.pending_io = None; self.planner = Some(maybe_planner?); } - // have a morsel ready to go, return that + // If a stream morsel is ready, return it. if let Some(morsel) = self.ready_morsels.pop_front() { return Poll::Ready(Ok(morsel.into_stream())); } + // This shim must always own either a planner, a pending planner + // future, or a ready morsel. Reaching this branch means the + // continuation was lost. let Some(planner) = self.planner.take() else { - // any path that leave planner as non return Poll::Ready(internal_err!( "ParquetOpenFuture polled after completion" )); }; + // Planner completed without producing a stream morsel. + // (e.g. all row groups were pruned) let Some(mut plan) = planner.plan()? else { return Poll::Ready(Ok(futures::stream::empty().boxed())); }; @@ -520,7 +523,7 @@ impl Morsel for ParquetStreamMorsel { } } -/// Planner for opening a single parquet file via the morsel APIs. +/// Per-file planner that owns the current [`ParquetOpenState`]. struct ParquetMorselPlanner { /// Ready to perform CPU-only planning work. state: ParquetOpenState, diff --git a/datafusion/datasource/src/morsel/mod.rs b/datafusion/datasource/src/morsel/mod.rs index 9a2e2a8e95328..9ae64ff0e870a 100644 --- a/datafusion/datasource/src/morsel/mod.rs +++ b/datafusion/datasource/src/morsel/mod.rs @@ -53,12 +53,10 @@ pub trait Morsel: Send + Debug { pub trait Morselizer: Send + Sync + Debug { /// Return the initial [`MorselPlanner`] for this file. /// - /// "Morselzing" a file may involve CPU work, such as parsing parquet + /// Morselizing a file may involve CPU work, such as parsing parquet /// metadata and evaluating pruning predicates. It should NOT do any I/O - /// work, such as reading from the file. If I/O is required, it should - /// return a future that the caller can poll to drive the I/O work to - /// completion, and once the future is complete, the caller can call - /// `plan_file` again for a different file. + /// work, such as reading from the file. Any needed I/O should be done using + /// [`MorselPlan::with_pending_planner`]. fn plan_file(&self, file: PartitionedFile) -> Result>; } @@ -84,9 +82,9 @@ pub trait MorselPlanner: Send + Debug { /// parquet metadata and evaluating pruning predicates. /// /// It should NOT do any I/O work, such as reading from the file. If I/O is - /// required, the returned [`MorselPlan`] should contain a future that the - /// caller polls to drive the I/O work to completion. Once the future is - /// complete, the caller can call `plan` again to get the next morsels. + /// required, the returned [`MorselPlan`] should contain a pending planner + /// future that the caller polls to drive the I/O work to completion. Once + /// that future resolves, it yields a planner ready for work. /// /// Note this function is **not async** to make it explicitly clear that if /// I/O is required, it should be done in the returned `io_future`. @@ -118,11 +116,11 @@ pub struct MorselPlan { morsels: Vec>, /// Planners that are ready for CPU work. ready_planners: Vec>, - /// A future that is doing IO that will resolve to MorselPlanner + /// A future with planner I/O that resolves to a CPU ready planner. /// /// DataFusion will poll this future occasionally to drive the I/O work to - /// completion. Once the future resolves, DataFusion will call `plan` again - /// to get the next morsels. + /// completion. Once it resolves, planning continues with the returned + /// planner. pending_planner: Option, } @@ -144,7 +142,7 @@ impl MorselPlan { self } - /// Set the pending future for planning + /// Set the pending planner for an I/O phase. pub fn with_pending_planner(mut self, io_future: F) -> Self where F: Future>> + Send + 'static, @@ -153,7 +151,7 @@ impl MorselPlan { self } - /// Set the pending future for planning + /// Set the pending planner for an I/O phase. pub fn set_pending_planner(&mut self, io_future: F) where F: Future>> + Send + 'static, @@ -188,14 +186,18 @@ pub struct PendingMorselPlanner { } impl PendingMorselPlanner { - /// Create a new pending morselization I/O future + /// Create a new pending planner future. /// /// Example /// ``` - /// # use datafusion_datasource::morsel::PendingMorselPlanner; + /// # use datafusion_common::DataFusionError; + /// # use datafusion_datasource::morsel::{MorselPlanner, PendingMorselPlanner}; /// let work = async move { - /// // Do I/O work here - /// # unimplemented!() + /// let planner: Box = { + /// // Do I/O work here, then return the next planner to run. + /// # unimplemented!(); + /// }; + /// Ok(planner) as Result<_, DataFusionError>; /// }; /// let pending_io = PendingMorselPlanner::new(work); /// ``` @@ -214,7 +216,7 @@ impl PendingMorselPlanner { } } -/// wraps the inner future +/// Forwards polling to the underlying future. impl Future for PendingMorselPlanner { type Output = Result>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { From 556690f0af383451cd97175db285a90daa35f4cf Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 8 Apr 2026 14:48:26 -0400 Subject: [PATCH 8/9] Add experimental comments --- datafusion/datasource/src/morsel/mod.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/datafusion/datasource/src/morsel/mod.rs b/datafusion/datasource/src/morsel/mod.rs index 9ae64ff0e870a..5f200d7794690 100644 --- a/datafusion/datasource/src/morsel/mod.rs +++ b/datafusion/datasource/src/morsel/mod.rs @@ -17,6 +17,9 @@ //! Structures for Morsel Driven IO. //! +//! NOTE: As of DataFusion 54.0.0, these are experimental APIs that may change +//! substantially. +//! //! Morsel Driven IO is a technique for parallelizing the reading of large files //! by dividing them into smaller "morsels" that are processed independently. //! From 696a42dce0dc7b3d9f2324cc6481ef1056490216 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 8 Apr 2026 15:15:01 -0400 Subject: [PATCH 9/9] Add assert that planner is not overwritten --- datafusion/datasource-parquet/src/opener.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index a1de1ef0fdfee..35900e16c18ed 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -459,6 +459,11 @@ impl Future for ParquetOpenFuture { // Clear `pending_io` before handling the result so an error // cannot leave both continuation paths populated. self.pending_io = None; + if self.planner.is_some() { + return Poll::Ready(internal_err!( + "ParquetOpenFuture does not support concurrent planners" + )); + } self.planner = Some(maybe_planner?); }