@@ -30,7 +30,8 @@ use arrow::datatypes::ArrowPrimitiveType;
3030use datafusion_expr_common:: groups_accumulator:: EmitTo ;
3131
3232use crate :: aggregate:: groups_accumulator:: {
33- BlockedGroupIndexOperations , FlatGroupIndexOperations , GroupIndexOperations ,
33+ ensure_room_enough_for_blocks, Block , BlockedGroupIndexOperations ,
34+ FlatGroupIndexOperations , GroupIndexOperations ,
3435} ;
3536
3637/// Track the accumulator null state per row: if any values for that
@@ -347,67 +348,31 @@ impl BlockedSeenValues {
347348 }
348349}
349350
350- impl SeenValues for BlockedSeenValues {
351- fn resize ( & mut self , total_num_groups : usize , default_value : bool ) {
352- let block_size = self . block_size ;
353- let blocked_builder = & mut self . blocked_builders ;
354-
355- // For resize, we need to:
356- // 1. Ensure the blks are enough first
357- // 2. and then ensure slots in blks are enough
358- let ( mut cur_blk_idx, exist_slots) = if blocked_builder. len ( ) > 0 {
359- let cur_blk_idx = blocked_builder. len ( ) - 1 ;
360- let exist_slots = ( blocked_builder. len ( ) - 1 ) * block_size
361- + blocked_builder. back ( ) . unwrap ( ) . len ( ) ;
362-
363- ( cur_blk_idx, exist_slots)
364- } else {
365- ( 0 , 0 )
366- } ;
367-
368- // No new groups, don't need to expand, just return
369- if exist_slots >= total_num_groups {
370- return ;
371- }
372-
373- // 1. Ensure blks are enough
374- let exist_blks = blocked_builder. len ( ) ;
375- let new_blks = ( ( total_num_groups + block_size - 1 ) / block_size) - exist_blks;
376- if new_blks > 0 {
377- for _ in 0 ..new_blks {
378- blocked_builder. push_back ( BooleanBufferBuilder :: new ( block_size) ) ;
379- }
380- }
351+ impl Block for BooleanBufferBuilder {
352+ type T = bool ;
381353
382- // 2. Ensure slots are enough
383- let mut new_slots = total_num_groups - exist_slots;
384-
385- // 2.1 Only fill current blk if it may be already enough
386- let cur_blk_rest_slots = block_size - blocked_builder[ cur_blk_idx] . len ( ) ;
387- if cur_blk_rest_slots >= new_slots {
388- blocked_builder[ cur_blk_idx] . append_n ( new_slots, default_value) ;
389- return ;
390- }
354+ fn len ( & self ) -> usize {
355+ self . len ( )
356+ }
391357
392- // 2.2 Fill current blk to full
393- blocked_builder[ cur_blk_idx] . append_n ( cur_blk_rest_slots, default_value) ;
394- new_slots -= cur_blk_rest_slots;
358+ fn fill_default_value ( & mut self , fill_len : usize , default_value : Self :: T ) {
359+ self . append_n ( fill_len, default_value) ;
360+ }
361+ }
395362
396- // 2.3 Fill complete blks
397- let complete_blks = new_slots / block_size;
398- for _ in 0 ..complete_blks {
399- cur_blk_idx += 1 ;
400- blocked_builder[ cur_blk_idx] . append_n ( block_size, default_value) ;
401- }
363+ impl SeenValues for BlockedSeenValues {
364+ fn resize ( & mut self , total_num_groups : usize , default_value : bool ) {
365+ let block_size = self . block_size ;
366+ let blocked_builders = & mut self . blocked_builders ;
367+ let new_block = |block_size : usize | BooleanBufferBuilder :: new ( block_size) ;
402368
403- // 2.4 Fill last blk if needed
404- let rest_slots = new_slots % block_size;
405- if rest_slots > 0 {
406- blocked_builder
407- . back_mut ( )
408- . unwrap ( )
409- . append_n ( rest_slots, default_value) ;
410- }
369+ ensure_room_enough_for_blocks (
370+ blocked_builders,
371+ total_num_groups,
372+ block_size,
373+ new_block,
374+ default_value,
375+ ) ;
411376 }
412377
413378 fn set_bit ( & mut self , block_id : u32 , block_offset : u64 , value : bool ) {
0 commit comments