Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 43 additions & 6 deletions datafusion/physical-plan/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ use datafusion_physical_expr_common::metrics::{
};
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
use futures::{Stream, StreamExt, TryStreamExt};
use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
use pin_project_lite::pin_project;
use std::any::Any;
use std::fmt;
use std::panic::AssertUnwindSafe;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
Expand Down Expand Up @@ -337,11 +338,24 @@ impl<T: Send + SizedMessage + 'static> MemoryBufferedStream<T> {
let item_or_err = tokio::select! {
biased;
_ = batch_tx.closed() => break,
item_or_err = input.next() => {
let Some(item_or_err) = item_or_err else {
break; // stream finished
};
item_or_err
// Catch a panic in the input poll so it surfaces as a stream error
// instead of dropping `batch_tx` and looking like a clean EOF.
polled = AssertUnwindSafe(input.next()).catch_unwind() => {
match polled {
Ok(Some(item_or_err)) => item_or_err,
Ok(None) => break, // stream finished
Err(panic) => {
let msg = panic
.downcast_ref::<&str>()
.map(|s| s.to_string())
.or_else(|| panic.downcast_ref::<String>().cloned())
.unwrap_or_else(|| "unknown panic".to_string());
let _ = batch_tx.send(internal_err!(
"BufferExec input stream panicked: {msg}"
));
break;
}
}
}
};

Expand Down Expand Up @@ -555,6 +569,29 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn panic_in_input_is_propagated() -> Result<(), Box<dyn Error>> {
// Panic mid-stream, modelling a panic deeper in the plan.
let input = futures::stream::iter([1, 2, 3, 4]).map(|v| {
if v == 3 {
panic!("boom on 3");
}
Ok(v)
});
let (_, res) = memory_pool_and_reservation();

let mut buffered = MemoryBufferedStream::new(input, 10, res);
wait_for_buffering().await;

pull_ok_msg(&mut buffered).await?;
pull_ok_msg(&mut buffered).await?;
// The panic is caught and surfaced as a stream error instead of a silent EOF.
let err = pull_err_msg(&mut buffered).await?;
assert_contains!(err.to_string(), "panicked");

Ok(())
}

#[tokio::test]
async fn memory_gets_released_if_stream_drops() -> Result<(), Box<dyn Error>> {
let input = futures::stream::iter([1, 2, 3, 4]).map(Ok);
Expand Down
Loading