1616// under the License.
1717
1818use crate :: spill:: get_record_batch_memory_size;
19+ use arrow:: array:: ArrayRef ;
1920use arrow:: compute:: interleave;
2021use arrow:: datatypes:: SchemaRef ;
22+ use arrow:: error:: ArrowError ;
2123use arrow:: record_batch:: RecordBatch ;
22- use datafusion_common:: Result ;
24+ use datafusion_common:: { DataFusionError , Result } ;
2325use datafusion_execution:: memory_pool:: MemoryReservation ;
26+ use log:: warn;
27+ use std:: any:: Any ;
28+ use std:: panic:: { AssertUnwindSafe , catch_unwind} ;
2429use std:: sync:: Arc ;
2530
2631#[ derive( Debug , Copy , Clone , Default ) ]
@@ -40,9 +45,24 @@ pub struct BatchBuilder {
4045 /// Maintain a list of [`RecordBatch`] and their corresponding stream
4146 batches : Vec < ( usize , RecordBatch ) > ,
4247
43- /// Accounts for memory used by buffered batches
48+ /// Accounts for memory used by buffered batches.
49+ ///
50+ /// May include pre-reserved bytes (from `sort_spill_reservation_bytes`)
51+ /// that were transferred via [`MemoryReservation::take()`] to prevent
52+ /// starvation when concurrent sort partitions compete for pool memory.
4453 reservation : MemoryReservation ,
4554
55+ /// Tracks the actual memory used by buffered batches (not including
56+ /// pre-reserved bytes). This allows [`Self::push_batch`] to skip pool
57+ /// allocation requests when the pre-reserved bytes cover the batch.
58+ batches_mem_used : usize ,
59+
60+ /// The initial reservation size at construction time. When the reservation
61+ /// is pre-loaded with `sort_spill_reservation_bytes` (via `take()`), this
62+ /// records that amount so we never shrink below it, maintaining the
63+ /// anti-starvation guarantee throughout the merge.
64+ initial_reservation : usize ,
65+
4666 /// The current [`BatchCursor`] for each stream
4767 cursors : Vec < BatchCursor > ,
4868
@@ -59,19 +79,26 @@ impl BatchBuilder {
5979 batch_size : usize ,
6080 reservation : MemoryReservation ,
6181 ) -> Self {
82+ let initial_reservation = reservation. size ( ) ;
6283 Self {
6384 schema,
6485 batches : Vec :: with_capacity ( stream_count * 2 ) ,
6586 cursors : vec ! [ BatchCursor :: default ( ) ; stream_count] ,
6687 indices : Vec :: with_capacity ( batch_size) ,
6788 reservation,
89+ batches_mem_used : 0 ,
90+ initial_reservation,
6891 }
6992 }
7093
7194 /// Append a new batch in `stream_idx`
7295 pub fn push_batch ( & mut self , stream_idx : usize , batch : RecordBatch ) -> Result < ( ) > {
73- self . reservation
74- . try_grow ( get_record_batch_memory_size ( & batch) ) ?;
96+ let size = get_record_batch_memory_size ( & batch) ;
97+ self . batches_mem_used += size;
98+ // Only request additional memory from the pool when actual batch
99+ // usage exceeds the current reservation (which may include
100+ // pre-reserved bytes from sort_spill_reservation_bytes).
101+ try_grow_reservation_to_at_least ( & mut self . reservation , self . batches_mem_used ) ?;
75102 let batch_idx = self . batches . len ( ) ;
76103 self . batches . push ( ( stream_idx, batch) ) ;
77104 self . cursors [ stream_idx] = BatchCursor {
@@ -104,53 +131,241 @@ impl BatchBuilder {
104131 & self . schema
105132 }
106133
134+ /// Try to interleave all columns using the given index slice.
135+ fn try_interleave_columns (
136+ & self ,
137+ indices : & [ ( usize , usize ) ] ,
138+ ) -> Result < Vec < ArrayRef > > {
139+ ( 0 ..self . schema . fields . len ( ) )
140+ . map ( |column_idx| {
141+ let arrays: Vec < _ > = self
142+ . batches
143+ . iter ( )
144+ . map ( |( _, batch) | batch. column ( column_idx) . as_ref ( ) )
145+ . collect ( ) ;
146+ recover_offset_overflow_from_panic ( || interleave ( & arrays, indices) )
147+ } )
148+ . collect :: < Result < Vec < _ > > > ( )
149+ }
150+
151+ /// Builds a record batch from the first `rows_to_emit` buffered rows.
152+ fn finish_record_batch (
153+ & mut self ,
154+ rows_to_emit : usize ,
155+ columns : Vec < ArrayRef > ,
156+ ) -> Result < RecordBatch > {
157+ // Remove consumed indices, keeping any remaining for the next call.
158+ self . indices . drain ( ..rows_to_emit) ;
159+
160+ // Only clean up fully-consumed batches when all indices are drained,
161+ // because remaining indices may still reference earlier batches.
162+ if self . indices . is_empty ( ) {
163+ // New cursors are only created once the previous cursor for the stream
164+ // is finished. This means all remaining rows from all but the last batch
165+ // for each stream have been yielded to the newly created record batch
166+ //
167+ // We can therefore drop all but the last batch for each stream
168+ let mut batch_idx = 0 ;
169+ let mut retained = 0 ;
170+ self . batches . retain ( |( stream_idx, batch) | {
171+ let stream_cursor = & mut self . cursors [ * stream_idx] ;
172+ let retain = stream_cursor. batch_idx == batch_idx;
173+ batch_idx += 1 ;
174+
175+ if retain {
176+ stream_cursor. batch_idx = retained;
177+ retained += 1 ;
178+ } else {
179+ self . batches_mem_used -= get_record_batch_memory_size ( batch) ;
180+ }
181+ retain
182+ } ) ;
183+ }
184+
185+ // Release excess memory back to the pool, but never shrink below
186+ // initial_reservation to maintain the anti-starvation guarantee
187+ // for the merge phase.
188+ let target = self . batches_mem_used . max ( self . initial_reservation ) ;
189+ if self . reservation . size ( ) > target {
190+ self . reservation . shrink ( self . reservation . size ( ) - target) ;
191+ }
192+
193+ RecordBatch :: try_new ( Arc :: clone ( & self . schema ) , columns) . map_err ( Into :: into)
194+ }
195+
107196 /// Drains the in_progress row indexes, and builds a new RecordBatch from them
108197 ///
109- /// Will then drop any batches for which all rows have been yielded to the output
198+ /// Will then drop any batches for which all rows have been yielded to the output.
199+ /// If an offset overflow occurs (e.g. string/list offsets exceed i32::MAX),
200+ /// retries with progressively fewer rows until it succeeds.
110201 ///
111202 /// Returns `None` if no pending rows
112203 pub fn build_record_batch ( & mut self ) -> Result < Option < RecordBatch > > {
113204 if self . is_empty ( ) {
114205 return Ok ( None ) ;
115206 }
116207
117- let columns = ( 0 ..self . schema . fields . len ( ) )
118- . map ( |column_idx| {
119- let arrays: Vec < _ > = self
120- . batches
121- . iter ( )
122- . map ( |( _, batch) | batch. column ( column_idx) . as_ref ( ) )
123- . collect ( ) ;
124- Ok ( interleave ( & arrays, & self . indices ) ?)
125- } )
126- . collect :: < Result < Vec < _ > > > ( ) ?;
127-
128- self . indices . clear ( ) ;
129-
130- // New cursors are only created once the previous cursor for the stream
131- // is finished. This means all remaining rows from all but the last batch
132- // for each stream have been yielded to the newly created record batch
133- //
134- // We can therefore drop all but the last batch for each stream
135- let mut batch_idx = 0 ;
136- let mut retained = 0 ;
137- self . batches . retain ( |( stream_idx, batch) | {
138- let stream_cursor = & mut self . cursors [ * stream_idx] ;
139- let retain = stream_cursor. batch_idx == batch_idx;
140- batch_idx += 1 ;
141-
142- if retain {
143- stream_cursor. batch_idx = retained;
144- retained += 1 ;
208+ let ( rows_to_emit, columns) =
209+ retry_interleave ( self . indices . len ( ) , self . indices . len ( ) , |rows_to_emit| {
210+ self . try_interleave_columns ( & self . indices [ ..rows_to_emit] )
211+ } ) ?;
212+
213+ Ok ( Some ( self . finish_record_batch ( rows_to_emit, columns) ?) )
214+ }
215+ }
216+
217+ /// Try to grow `reservation` so it covers at least `needed` bytes.
218+ ///
219+ /// When a reservation has been pre-loaded with bytes (e.g. via
220+ /// [`MemoryReservation::take()`]), this avoids redundant pool
221+ /// allocations: if the reservation already covers `needed`, this is
222+ /// a no-op; otherwise only the deficit is requested from the pool.
223+ pub ( crate ) fn try_grow_reservation_to_at_least (
224+ reservation : & mut MemoryReservation ,
225+ needed : usize ,
226+ ) -> Result < ( ) > {
227+ if needed > reservation. size ( ) {
228+ reservation. try_grow ( needed - reservation. size ( ) ) ?;
229+ }
230+ Ok ( ( ) )
231+ }
232+
233+ /// Returns true if the error is an Arrow offset overflow.
234+ fn is_offset_overflow ( e : & DataFusionError ) -> bool {
235+ matches ! (
236+ e,
237+ DataFusionError :: ArrowError ( boxed, _)
238+ if matches!( boxed. as_ref( ) , ArrowError :: OffsetOverflowError ( _) )
239+ )
240+ }
241+
242+ fn offset_overflow_error ( ) -> DataFusionError {
243+ DataFusionError :: ArrowError ( Box :: new ( ArrowError :: OffsetOverflowError ( 0 ) ) , None )
244+ }
245+
246+ fn recover_offset_overflow_from_panic < T , F > ( f : F ) -> Result < T >
247+ where
248+ F : FnOnce ( ) -> std:: result:: Result < T , ArrowError > ,
249+ {
250+ // Arrow's interleave can panic on i32 offset overflow with
251+ // `.expect("overflow")` / `.expect("offset overflow")`.
252+ // Catch only those specific panics so the caller can retry
253+ // with fewer rows while unrelated defects still unwind.
254+ match catch_unwind ( AssertUnwindSafe ( f) ) {
255+ Ok ( result) => Ok ( result?) ,
256+ Err ( panic_payload) => {
257+ if is_arrow_offset_overflow_panic ( panic_payload. as_ref ( ) ) {
258+ Err ( offset_overflow_error ( ) )
259+ } else {
260+ std:: panic:: resume_unwind ( panic_payload) ;
261+ }
262+ }
263+ }
264+ }
265+
266+ fn retry_interleave < T , F > (
267+ mut rows_to_emit : usize ,
268+ total_rows : usize ,
269+ mut interleave : F ,
270+ ) -> Result < ( usize , T ) >
271+ where
272+ F : FnMut ( usize ) -> Result < T > ,
273+ {
274+ loop {
275+ match interleave ( rows_to_emit) {
276+ Ok ( value) => return Ok ( ( rows_to_emit, value) ) ,
277+ Err ( e) if is_offset_overflow ( & e) => {
278+ rows_to_emit /= 2 ;
279+ if rows_to_emit == 0 {
280+ return Err ( e) ;
281+ }
282+ warn ! (
283+ "Interleave offset overflow with {total_rows} rows, retrying with {rows_to_emit}"
284+ ) ;
285+ }
286+ Err ( e) => return Err ( e) ,
287+ }
288+ }
289+ }
290+
291+ fn panic_message ( payload : & ( dyn Any + Send ) ) -> Option < & str > {
292+ if let Some ( msg) = payload. downcast_ref :: < & str > ( ) {
293+ return Some ( msg) ;
294+ }
295+ if let Some ( msg) = payload. downcast_ref :: < String > ( ) {
296+ return Some ( msg. as_str ( ) ) ;
297+ }
298+ None
299+ }
300+
301+ /// Returns true if a caught panic payload matches the Arrow offset overflows
302+ /// raised by interleave's offset builders.
303+ fn is_arrow_offset_overflow_panic ( payload : & ( dyn Any + Send ) ) -> bool {
304+ matches ! ( panic_message( payload) , Some ( "overflow" | "offset overflow" ) )
305+ }
306+
307+ #[ cfg( test) ]
308+ mod tests {
309+ use super :: * ;
310+ use arrow:: error:: ArrowError ;
311+
312+ #[ test]
313+ fn test_retry_interleave_halves_rows_until_success ( ) {
314+ let mut attempts = Vec :: new ( ) ;
315+
316+ let ( rows_to_emit, result) = retry_interleave ( 4 , 4 , |rows_to_emit| {
317+ attempts. push ( rows_to_emit) ;
318+ if rows_to_emit > 1 {
319+ Err ( offset_overflow_error ( ) )
145320 } else {
146- self . reservation . shrink ( get_record_batch_memory_size ( batch ) ) ;
321+ Ok ( "ok" )
147322 }
148- retain
149- } ) ;
323+ } )
324+ . unwrap ( ) ;
325+
326+ assert_eq ! ( rows_to_emit, 1 ) ;
327+ assert_eq ! ( result, "ok" ) ;
328+ assert_eq ! ( attempts, vec![ 4 , 2 , 1 ] ) ;
329+ }
330+
331+ #[ test]
332+ fn test_recover_offset_overflow_from_panic ( ) {
333+ let error = recover_offset_overflow_from_panic (
334+ || -> std:: result:: Result < ( ) , ArrowError > { panic ! ( "offset overflow" ) } ,
335+ )
336+ . unwrap_err ( ) ;
337+
338+ assert ! ( is_offset_overflow( & error) ) ;
339+ }
340+
341+ #[ test]
342+ fn test_recover_offset_overflow_from_panic_rethrows_unrelated_panics ( ) {
343+ let panic_payload = catch_unwind ( AssertUnwindSafe ( || {
344+ let _ = recover_offset_overflow_from_panic (
345+ || -> std:: result:: Result < ( ) , ArrowError > { panic ! ( "capacity overflow" ) } ,
346+ ) ;
347+ } ) ) ;
348+
349+ assert ! ( panic_payload. is_err( ) ) ;
350+ }
351+
352+ #[ test]
353+ fn test_is_arrow_offset_overflow_panic ( ) {
354+ let overflow = Box :: new ( "overflow" ) as Box < dyn Any + Send > ;
355+ assert ! ( is_arrow_offset_overflow_panic( overflow. as_ref( ) ) ) ;
356+
357+ let offset_overflow =
358+ Box :: new ( String :: from ( "offset overflow" ) ) as Box < dyn Any + Send > ;
359+ assert ! ( is_arrow_offset_overflow_panic( offset_overflow. as_ref( ) ) ) ;
360+
361+ let capacity_overflow = Box :: new ( "capacity overflow" ) as Box < dyn Any + Send > ;
362+ assert ! ( !is_arrow_offset_overflow_panic( capacity_overflow. as_ref( ) ) ) ;
150363
151- Ok ( Some ( RecordBatch :: try_new (
152- Arc :: clone ( & self . schema ) ,
153- columns,
154- ) ?) )
364+ let arithmetic_overflow =
365+ Box :: new ( String :: from ( "attempt to multiply with overflow" ) )
366+ as Box < dyn Any + Send > ;
367+ assert ! ( !is_arrow_offset_overflow_panic(
368+ arithmetic_overflow. as_ref( )
369+ ) ) ;
155370 }
156371}
0 commit comments