From afdd4ea0653665abc774aef5bf3d534e4528fc68 Mon Sep 17 00:00:00 2001 From: wenqi mou Date: Fri, 26 Jun 2026 14:57:59 -0400 Subject: [PATCH] Propagate BufferExec input panics instead of silently truncating A panic while polling the input inside MemoryBufferedStream's producer task was caught by the tokio task harness, dropping the sender; the consumer then read the closed channel as a clean EOF and silently truncated this partition's output. Wrap the input poll in catch_unwind and forward the panic as a DataFusionError over the existing error channel, so it propagates and fails the query instead of being swallowed. Found in staging: a SinglePartitioned aggregate emitting a 16-byte FixedSizeBinary group-key array past Arrow's 2 GiB i32-offset limit panicked, yet the query "succeeded" with partial (under-counted) results. Co-Authored-By: Claude Opus 4.8 --- datafusion/physical-plan/src/buffer.rs | 49 ++++++++++++++++++++++---- 1 file changed, 43 insertions(+), 6 deletions(-) diff --git a/datafusion/physical-plan/src/buffer.rs b/datafusion/physical-plan/src/buffer.rs index a59d062929974..d057583a78926 100644 --- a/datafusion/physical-plan/src/buffer.rs +++ b/datafusion/physical-plan/src/buffer.rs @@ -40,10 +40,11 @@ use datafusion_physical_expr_common::metrics::{ }; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; -use futures::{Stream, StreamExt, TryStreamExt}; +use futures::{FutureExt, Stream, StreamExt, TryStreamExt}; use pin_project_lite::pin_project; use std::any::Any; use std::fmt; +use std::panic::AssertUnwindSafe; use std::pin::Pin; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -337,11 +338,24 @@ impl MemoryBufferedStream { let item_or_err = tokio::select! { biased; _ = batch_tx.closed() => break, - item_or_err = input.next() => { - let Some(item_or_err) = item_or_err else { - break; // stream finished - }; - item_or_err + // Catch a panic in the input poll so it surfaces as a stream error + // instead of dropping `batch_tx` and looking like a clean EOF. + polled = AssertUnwindSafe(input.next()).catch_unwind() => { + match polled { + Ok(Some(item_or_err)) => item_or_err, + Ok(None) => break, // stream finished + Err(panic) => { + let msg = panic + .downcast_ref::<&str>() + .map(|s| s.to_string()) + .or_else(|| panic.downcast_ref::().cloned()) + .unwrap_or_else(|| "unknown panic".to_string()); + let _ = batch_tx.send(internal_err!( + "BufferExec input stream panicked: {msg}" + )); + break; + } + } } }; @@ -555,6 +569,29 @@ mod tests { Ok(()) } + #[tokio::test] + async fn panic_in_input_is_propagated() -> Result<(), Box> { + // Panic mid-stream, modelling a panic deeper in the plan. + let input = futures::stream::iter([1, 2, 3, 4]).map(|v| { + if v == 3 { + panic!("boom on 3"); + } + Ok(v) + }); + let (_, res) = memory_pool_and_reservation(); + + let mut buffered = MemoryBufferedStream::new(input, 10, res); + wait_for_buffering().await; + + pull_ok_msg(&mut buffered).await?; + pull_ok_msg(&mut buffered).await?; + // The panic is caught and surfaced as a stream error instead of a silent EOF. + let err = pull_err_msg(&mut buffered).await?; + assert_contains!(err.to_string(), "panicked"); + + Ok(()) + } + #[tokio::test] async fn memory_gets_released_if_stream_drops() -> Result<(), Box> { let input = futures::stream::iter([1, 2, 3, 4]).map(Ok);