@@ -24,8 +24,6 @@ use arrow::record_batch::RecordBatch;
2424use datafusion_common:: { DataFusionError , Result } ;
2525use datafusion_execution:: memory_pool:: MemoryReservation ;
2626use log:: warn;
27- use std:: any:: Any ;
28- use std:: panic:: { AssertUnwindSafe , catch_unwind} ;
2927use std:: sync:: Arc ;
3028
3129#[ derive( Debug , Copy , Clone , Default ) ]
@@ -143,7 +141,9 @@ impl BatchBuilder {
143141 . iter ( )
144142 . map ( |( _, batch) | batch. column ( column_idx) . as_ref ( ) )
145143 . collect ( ) ;
146- recover_offset_overflow_from_panic ( || interleave ( & arrays, indices) )
144+ // Arrow 58.1.0+ returns OffsetOverflowError directly from
145+ // interleave, allowing retry_interleave to shrink the batch.
146+ interleave ( & arrays, indices) . map_err ( Into :: into)
147147 } )
148148 . collect :: < Result < Vec < _ > > > ( )
149149 }
@@ -243,33 +243,11 @@ fn is_offset_overflow(e: &DataFusionError) -> bool {
243243 )
244244}
245245
246+ #[ cfg( test) ]
246247fn offset_overflow_error ( ) -> DataFusionError {
247248 DataFusionError :: ArrowError ( Box :: new ( ArrowError :: OffsetOverflowError ( 0 ) ) , None )
248249}
249250
250- fn recover_offset_overflow_from_panic < T , F > ( f : F ) -> Result < T >
251- where
252- F : FnOnce ( ) -> std:: result:: Result < T , ArrowError > ,
253- {
254- // Arrow's interleave can panic on i32 offset overflow with
255- // `.expect("overflow")` / `.expect("offset overflow")`.
256- // Catch only those specific panics so the caller can retry
257- // with fewer rows while unrelated defects still unwind.
258- //
259- // TODO: remove once arrow-rs#9549 lands — interleave will return
260- // OffsetOverflowError directly instead of panicking.
261- match catch_unwind ( AssertUnwindSafe ( f) ) {
262- Ok ( result) => Ok ( result?) ,
263- Err ( panic_payload) => {
264- if is_arrow_offset_overflow_panic ( panic_payload. as_ref ( ) ) {
265- Err ( offset_overflow_error ( ) )
266- } else {
267- std:: panic:: resume_unwind ( panic_payload) ;
268- }
269- }
270- }
271- }
272-
273251fn retry_interleave < T , F > (
274252 mut rows_to_emit : usize ,
275253 total_rows : usize ,
@@ -281,6 +259,7 @@ where
281259 loop {
282260 match interleave ( rows_to_emit) {
283261 Ok ( value) => return Ok ( ( rows_to_emit, value) ) ,
262+ // Only offset overflow is recoverable by emitting fewer rows.
284263 Err ( e) if is_offset_overflow ( & e) => {
285264 rows_to_emit /= 2 ;
286265 if rows_to_emit == 0 {
@@ -295,26 +274,34 @@ where
295274 }
296275}
297276
298- fn panic_message ( payload : & ( dyn Any + Send ) ) -> Option < & str > {
299- if let Some ( msg) = payload. downcast_ref :: < & str > ( ) {
300- return Some ( msg) ;
301- }
302- if let Some ( msg) = payload. downcast_ref :: < String > ( ) {
303- return Some ( msg. as_str ( ) ) ;
304- }
305- None
306- }
307-
308- /// Returns true if a caught panic payload matches the Arrow offset overflows
309- /// raised by interleave's offset builders.
310- fn is_arrow_offset_overflow_panic ( payload : & ( dyn Any + Send ) ) -> bool {
311- matches ! ( panic_message( payload) , Some ( "overflow" | "offset overflow" ) )
312- }
313-
314277#[ cfg( test) ]
315278mod tests {
316279 use super :: * ;
317- use arrow:: error:: ArrowError ;
280+ use arrow:: array:: { Array , ArrayDataBuilder , Int32Array , ListArray } ;
281+ use arrow:: buffer:: Buffer ;
282+ use arrow:: datatypes:: { DataType , Field , Schema } ;
283+ use datafusion_execution:: memory_pool:: {
284+ MemoryConsumer , MemoryPool , UnboundedMemoryPool ,
285+ } ;
286+
287+ fn overflow_list_batch ( ) -> RecordBatch {
288+ let values_field = Arc :: new ( Field :: new_list_field ( DataType :: Int32 , true ) ) ;
289+ // SAFETY: This intentionally constructs an invalid child length so
290+ // Arrow's interleave hits offset overflow before touching child data.
291+ let list = ListArray :: from ( unsafe {
292+ ArrayDataBuilder :: new ( DataType :: List ( Arc :: clone ( & values_field) ) )
293+ . len ( 1 )
294+ . add_buffer ( Buffer :: from_slice_ref ( [ 0_i32 , i32:: MAX ] ) )
295+ . add_child_data ( Int32Array :: from ( Vec :: < i32 > :: new ( ) ) . to_data ( ) )
296+ . build_unchecked ( )
297+ } ) ;
298+ let schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new(
299+ "list_col" ,
300+ DataType :: List ( values_field) ,
301+ true ,
302+ ) ] ) ) ;
303+ RecordBatch :: try_new ( schema, vec ! [ Arc :: new( list) ] ) . unwrap ( )
304+ }
318305
319306 #[ test]
320307 fn test_retry_interleave_halves_rows_until_success ( ) {
@@ -336,43 +323,37 @@ mod tests {
336323 }
337324
338325 #[ test]
339- fn test_recover_offset_overflow_from_panic ( ) {
340- let error = recover_offset_overflow_from_panic (
341- || -> std:: result:: Result < ( ) , ArrowError > { panic ! ( "offset overflow" ) } ,
342- )
343- . unwrap_err ( ) ;
344-
345- assert ! ( is_offset_overflow( & error) ) ;
326+ fn test_is_offset_overflow_matches_arrow_error ( ) {
327+ assert ! ( is_offset_overflow( & offset_overflow_error( ) ) ) ;
346328 }
347329
348330 #[ test]
349- fn test_recover_offset_overflow_from_panic_rethrows_unrelated_panics ( ) {
350- let panic_payload = catch_unwind ( AssertUnwindSafe ( || {
351- let _ = recover_offset_overflow_from_panic (
352- || -> std:: result:: Result < ( ) , ArrowError > { panic ! ( "capacity overflow" ) } ,
353- ) ;
354- } ) ) ;
355-
356- assert ! ( panic_payload. is_err( ) ) ;
331+ fn test_retry_interleave_does_not_retry_non_offset_errors ( ) {
332+ let mut attempts = Vec :: new ( ) ;
333+
334+ let error = retry_interleave ( 4 , 4 , |rows_to_emit| {
335+ attempts. push ( rows_to_emit) ;
336+ Err :: < ( ) , _ > ( DataFusionError :: Execution ( "boom" . into ( ) ) )
337+ } )
338+ . unwrap_err ( ) ;
339+
340+ assert_eq ! ( attempts, vec![ 4 ] ) ;
341+ assert ! ( matches!( error, DataFusionError :: Execution ( msg) if msg == "boom" ) ) ;
357342 }
358343
359344 #[ test]
360- fn test_is_arrow_offset_overflow_panic ( ) {
361- let overflow = Box :: new ( "overflow" ) as Box < dyn Any + Send > ;
362- assert ! ( is_arrow_offset_overflow_panic( overflow. as_ref( ) ) ) ;
363-
364- let offset_overflow =
365- Box :: new ( String :: from ( "offset overflow" ) ) as Box < dyn Any + Send > ;
366- assert ! ( is_arrow_offset_overflow_panic( offset_overflow. as_ref( ) ) ) ;
367-
368- let capacity_overflow = Box :: new ( "capacity overflow" ) as Box < dyn Any + Send > ;
369- assert ! ( !is_arrow_offset_overflow_panic( capacity_overflow. as_ref( ) ) ) ;
370-
371- let arithmetic_overflow =
372- Box :: new ( String :: from ( "attempt to multiply with overflow" ) )
373- as Box < dyn Any + Send > ;
374- assert ! ( !is_arrow_offset_overflow_panic(
375- arithmetic_overflow. as_ref( )
376- ) ) ;
345+ fn test_try_interleave_columns_surfaces_arrow_offset_overflow ( ) {
346+ let batch = overflow_list_batch ( ) ;
347+ let schema = batch. schema ( ) ;
348+ let pool: Arc < dyn MemoryPool > = Arc :: new ( UnboundedMemoryPool :: default ( ) ) ;
349+ let reservation = MemoryConsumer :: new ( "test" ) . register ( & pool) ;
350+ let mut builder = BatchBuilder :: new ( schema, 1 , 2 , reservation) ;
351+ builder. push_batch ( 0 , batch) . unwrap ( ) ;
352+
353+ let error = builder
354+ . try_interleave_columns ( & [ ( 0 , 0 ) , ( 0 , 0 ) ] )
355+ . unwrap_err ( ) ;
356+
357+ assert ! ( is_offset_overflow( & error) ) ;
377358 }
378359}
0 commit comments