Skip to content

Commit afdd4ea

Browse files
Tristan1900claude
andcommitted
Propagate BufferExec input panics instead of silently truncating
A panic while polling the input inside MemoryBufferedStream's producer task was caught by the tokio task harness, dropping the sender; the consumer then read the closed channel as a clean EOF and silently truncated this partition's output. Wrap the input poll in catch_unwind and forward the panic as a DataFusionError over the existing error channel, so it propagates and fails the query instead of being swallowed. Found in staging: a SinglePartitioned aggregate emitting a 16-byte FixedSizeBinary group-key array past Arrow's 2 GiB i32-offset limit panicked, yet the query "succeeded" with partial (under-counted) results. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
1 parent 4d3dc91 commit afdd4ea

1 file changed

Lines changed: 43 additions & 6 deletions

File tree

datafusion/physical-plan/src/buffer.rs

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,11 @@ use datafusion_physical_expr_common::metrics::{
4040
};
4141
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
4242
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
43-
use futures::{Stream, StreamExt, TryStreamExt};
43+
use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
4444
use pin_project_lite::pin_project;
4545
use std::any::Any;
4646
use std::fmt;
47+
use std::panic::AssertUnwindSafe;
4748
use std::pin::Pin;
4849
use std::sync::Arc;
4950
use std::sync::atomic::{AtomicUsize, Ordering};
@@ -337,11 +338,24 @@ impl<T: Send + SizedMessage + 'static> MemoryBufferedStream<T> {
337338
let item_or_err = tokio::select! {
338339
biased;
339340
_ = batch_tx.closed() => break,
340-
item_or_err = input.next() => {
341-
let Some(item_or_err) = item_or_err else {
342-
break; // stream finished
343-
};
344-
item_or_err
341+
// Catch a panic in the input poll so it surfaces as a stream error
342+
// instead of dropping `batch_tx` and looking like a clean EOF.
343+
polled = AssertUnwindSafe(input.next()).catch_unwind() => {
344+
match polled {
345+
Ok(Some(item_or_err)) => item_or_err,
346+
Ok(None) => break, // stream finished
347+
Err(panic) => {
348+
let msg = panic
349+
.downcast_ref::<&str>()
350+
.map(|s| s.to_string())
351+
.or_else(|| panic.downcast_ref::<String>().cloned())
352+
.unwrap_or_else(|| "unknown panic".to_string());
353+
let _ = batch_tx.send(internal_err!(
354+
"BufferExec input stream panicked: {msg}"
355+
));
356+
break;
357+
}
358+
}
345359
}
346360
};
347361

@@ -555,6 +569,29 @@ mod tests {
555569
Ok(())
556570
}
557571

572+
#[tokio::test]
573+
async fn panic_in_input_is_propagated() -> Result<(), Box<dyn Error>> {
574+
// Panic mid-stream, modelling a panic deeper in the plan.
575+
let input = futures::stream::iter([1, 2, 3, 4]).map(|v| {
576+
if v == 3 {
577+
panic!("boom on 3");
578+
}
579+
Ok(v)
580+
});
581+
let (_, res) = memory_pool_and_reservation();
582+
583+
let mut buffered = MemoryBufferedStream::new(input, 10, res);
584+
wait_for_buffering().await;
585+
586+
pull_ok_msg(&mut buffered).await?;
587+
pull_ok_msg(&mut buffered).await?;
588+
// The panic is caught and surfaced as a stream error instead of a silent EOF.
589+
let err = pull_err_msg(&mut buffered).await?;
590+
assert_contains!(err.to_string(), "panicked");
591+
592+
Ok(())
593+
}
594+
558595
#[tokio::test]
559596
async fn memory_gets_released_if_stream_drops() -> Result<(), Box<dyn Error>> {
560597
let input = futures::stream::iter([1, 2, 3, 4]).map(Ok);

0 commit comments

Comments
 (0)