Skip to content

Commit 6e272cb

Browse files
committed
Take Box<Self>
1 parent ebc21ff commit 6e272cb

2 files changed

Lines changed: 65 additions & 74 deletions

File tree

datafusion/datasource-parquet/src/opener.rs

Lines changed: 64 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,7 @@ impl ParquetOpenState {
429429
///
430430
/// Implements state machine described in [`ParquetOpenState`]
431431
struct ParquetOpenFuture {
432-
planner: Box<dyn MorselPlanner>,
432+
planner: Option<Box<dyn MorselPlanner>>,
433433
pending_io: Option<PendingMorselPlanner>,
434434
ready_morsels: VecDeque<Box<dyn Morsel>>,
435435
}
@@ -440,7 +440,7 @@ impl ParquetOpenFuture {
440440
partitioned_file: PartitionedFile,
441441
) -> Result<Self> {
442442
Ok(Self {
443-
planner: morselizer.plan_file(partitioned_file)?,
443+
planner: Some(morselizer.plan_file(partitioned_file)?),
444444
pending_io: None,
445445
ready_morsels: VecDeque::new(),
446446
})
@@ -454,27 +454,38 @@ impl Future for ParquetOpenFuture {
454454
loop {
455455
// If waiting on IO, poll
456456
if let Some(io_future) = self.pending_io.as_mut() {
457-
let planner = ready!(io_future.poll_unpin(cx))?;
458-
self.planner = planner;
457+
let maybe_planner = ready!(io_future.poll_unpin(cx));
458+
// future has resolved. Clear pending io before processing the
459+
// result to ensure that if the future returns an error, we
460+
// don't end up in a state where both are set and accidentally
461+
// ignore the error on the next poll
459462
self.pending_io = None;
463+
self.planner = Some(maybe_planner?);
460464
}
461465

462466
// have a morsel ready to go, return that
463467
if let Some(morsel) = self.ready_morsels.pop_front() {
464468
return Poll::Ready(Ok(morsel.into_stream()));
465469
}
466470

467-
// Planner did not produce any stream (for example, it pruned the entire file)
468-
let Some(mut plan) = self.planner.plan()? else {
471+
let Some(planner) = self.planner.take() else {
472+
// any path that leave planner as non
473+
return Poll::Ready(internal_err!(
474+
"ParquetOpenFuture polled after completion"
475+
));
476+
};
477+
478+
let Some(mut plan) = planner.plan()? else {
469479
return Poll::Ready(Ok(futures::stream::empty().boxed()));
470480
};
471481

472-
let child_planners = plan.take_ready_planners();
473-
if !child_planners.is_empty() {
482+
let mut child_planners = plan.take_ready_planners();
483+
if child_planners.len() > 1 {
474484
return Poll::Ready(internal_err!(
475485
"Parquet FileOpener adapter does not support child morsel planners"
476486
));
477487
}
488+
self.planner = child_planners.pop();
478489

479490
self.ready_morsels = plan.take_morsels().into();
480491

@@ -513,10 +524,6 @@ impl Morsel for ParquetStreamMorsel {
513524
enum ParquetMorselPlanner {
514525
/// Ready to perform CPU-only planning work.
515526
Ready(ParquetOpenState),
516-
/// Actively planning (this state should be replaced by end of the call to plan)
517-
Planning,
518-
/// An earlier planning attempt returned an error.
519-
Errored,
520527
}
521528

522529
impl fmt::Debug for ParquetMorselPlanner {
@@ -526,8 +533,6 @@ impl fmt::Debug for ParquetMorselPlanner {
526533
.debug_tuple("ParquetMorselPlanner::Ready")
527534
.field(state)
528535
.finish(),
529-
Self::Planning => f.debug_tuple("ParquetMorselPlanner::Planning").finish(),
530-
Self::Errored => f.debug_tuple("ParquetMorselPlanner::Errored").finish(),
531536
}
532537
}
533538
}
@@ -569,67 +574,53 @@ impl ParquetMorselPlanner {
569574
}
570575

571576
impl MorselPlanner for ParquetMorselPlanner {
572-
fn plan(&mut self) -> Result<Option<MorselPlan>> {
573-
loop {
574-
let planner = mem::replace(self, ParquetMorselPlanner::Planning);
575-
let state = match planner {
576-
ParquetMorselPlanner::Ready(state) => state,
577-
ParquetMorselPlanner::Planning => {
578-
return internal_err!(
579-
"ParquetMorselPlanner::plan was re-entered before previous plan completed"
580-
);
581-
}
582-
ParquetMorselPlanner::Errored => {
583-
return internal_err!(
584-
"ParquetMorselPlanner::plan called after a previous error"
585-
);
586-
}
587-
};
588-
// check for end of stream
589-
if let ParquetOpenState::Done = state {
590-
*self = ParquetMorselPlanner::Ready(ParquetOpenState::Done);
591-
return Ok(None);
592-
};
577+
fn plan(self: Box<Self>) -> Result<Option<MorselPlan>> {
578+
let state = match *self {
579+
ParquetMorselPlanner::Ready(state) => state,
580+
};
593581

594-
let state = state.transition().inspect_err(|_| {
595-
*self = ParquetMorselPlanner::Errored;
596-
})?;
582+
if let ParquetOpenState::Done = state {
583+
return Ok(None);
584+
}
597585

598-
match state {
599-
#[cfg(feature = "parquet_encryption")]
600-
ParquetOpenState::LoadEncryption(future) => {
601-
return Ok(Some(Self::schedule_io(async move {
602-
Ok(ParquetOpenState::PruneFile(future.await?))
603-
})));
604-
}
605-
ParquetOpenState::LoadMetadata(future) => {
606-
return Ok(Some(Self::schedule_io(async move {
607-
Ok(ParquetOpenState::PrepareFilters(Box::new(future.await?)))
608-
})));
609-
}
610-
ParquetOpenState::LoadPageIndex(future) => {
611-
return Ok(Some(Self::schedule_io(async move {
612-
Ok(ParquetOpenState::PruneWithStatistics(Box::new(
613-
future.await?,
614-
)))
615-
})));
616-
}
617-
ParquetOpenState::LoadBloomFilters(future) => {
618-
return Ok(Some(Self::schedule_io(async move {
619-
Ok(ParquetOpenState::PruneWithBloomFilters(Box::new(
620-
future.await?,
621-
)))
622-
})));
623-
}
624-
ParquetOpenState::Ready(stream) => {
625-
let morsels: Vec<Box<dyn Morsel>> =
626-
vec![Box::new(ParquetStreamMorsel::new(stream))];
627-
return Ok(Some(MorselPlan::new().with_morsels(morsels)));
628-
}
629-
ParquetOpenState::Done => return Ok(None),
630-
cpu_state => {
631-
*self = ParquetMorselPlanner::Ready(cpu_state);
632-
}
586+
let state = state.transition()?;
587+
588+
match state {
589+
#[cfg(feature = "parquet_encryption")]
590+
ParquetOpenState::LoadEncryption(future) => {
591+
Ok(Some(Self::schedule_io(async move {
592+
Ok(ParquetOpenState::PruneFile(future.await?))
593+
})))
594+
}
595+
ParquetOpenState::LoadMetadata(future) => {
596+
Ok(Some(Self::schedule_io(async move {
597+
Ok(ParquetOpenState::PrepareFilters(Box::new(future.await?)))
598+
})))
599+
}
600+
ParquetOpenState::LoadPageIndex(future) => {
601+
Ok(Some(Self::schedule_io(async move {
602+
Ok(ParquetOpenState::PruneWithStatistics(Box::new(
603+
future.await?,
604+
)))
605+
})))
606+
}
607+
ParquetOpenState::LoadBloomFilters(future) => {
608+
Ok(Some(Self::schedule_io(async move {
609+
Ok(ParquetOpenState::PruneWithBloomFilters(Box::new(
610+
future.await?,
611+
)))
612+
})))
613+
}
614+
ParquetOpenState::Ready(stream) => {
615+
let morsels: Vec<Box<dyn Morsel>> =
616+
vec![Box::new(ParquetStreamMorsel::new(stream))];
617+
Ok(Some(MorselPlan::new().with_morsels(morsels)))
618+
}
619+
ParquetOpenState::Done => Ok(None),
620+
cpu_state => {
621+
Ok(Some(MorselPlan::new().with_planners(vec![Box::new(
622+
ParquetMorselPlanner::Ready(cpu_state),
623+
)])))
633624
}
634625
}
635626
}

datafusion/datasource/src/morsel/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ pub trait MorselPlanner: Send + Debug {
101101
/// # Output Ordering
102102
///
103103
/// See the comments on [`MorselPlan`] for the logical output order.
104-
fn plan(&mut self) -> Result<Option<MorselPlan>>;
104+
fn plan(self: Box<Self>) -> Result<Option<MorselPlan>>;
105105
}
106106

107107
/// Return result of [`MorselPlanner::plan`].

0 commit comments

Comments
 (0)