Skip to content

Commit b2b8a23

Browse files
author
Bert Vermeiren
committed
Fix: filter with limit support raises internal error (tackled review comments)
1 parent 95b675e commit b2b8a23

2 files changed

Lines changed: 5 additions & 7 deletions

File tree

datafusion/physical-plan/src/coalesce/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ impl LimitedBatchCoalescer {
134134
Ok(())
135135
}
136136

137-
pub fn is_finished(&self) -> bool {
137+
pub(crate) fn is_finished(&self) -> bool {
138138
self.finished
139139
}
140140

datafusion/physical-plan/src/filter.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ use super::{
2626
ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties,
2727
RecordBatchStream, SendableRecordBatchStream, Statistics,
2828
};
29-
use crate::coalesce::PushBatchStatus::LimitReached;
3029
use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus};
3130
use crate::common::can_project;
3231
use crate::execution_plan::CardinalityEffect;
@@ -752,7 +751,7 @@ impl Stream for FilterExecStream {
752751
) -> Poll<Option<Self::Item>> {
753752
let elapsed_compute = self.metrics.baseline_metrics.elapsed_compute().clone();
754753
loop {
755-
// If there is any completed batch ready, return it
754+
// If there is a completed batch ready, return it
756755
if let Some(batch) = self.batch_coalescer.next_completed_batch() {
757756
self.metrics.selectivity.add_part(batch.num_rows());
758757
let poll = Poll::Ready(Some(Ok(batch)));
@@ -761,8 +760,7 @@ impl Stream for FilterExecStream {
761760

762761
if self.batch_coalescer.is_finished() {
763762
// If input is done and no batches are ready, return None to signal end of stream.
764-
let poll = Poll::Ready(None);
765-
return self.metrics.baseline_metrics.record_poll(poll);
763+
return Poll::Ready(None);
766764
}
767765

768766
// Attempt to pull the next batch from the input stream.
@@ -806,9 +804,9 @@ impl Stream for FilterExecStream {
806804
PushBatchStatus::Continue => {
807805
// Keep pushing more batches
808806
}
809-
LimitReached => {
807+
PushBatchStatus::LimitReached => {
810808
// limit was reached, so stop early
811-
self.batch_coalescer.finish()?
809+
self.batch_coalescer.finish()?;
812810
// continue draining the coalescer
813811
}
814812
}

0 commit comments

Comments
 (0)