1515// specific language governing permissions and limitations
1616// under the License.
1717
18+ use std:: collections:: VecDeque ;
19+ use std:: iter;
1820use std:: mem:: size_of;
1921use std:: sync:: Arc ;
2022
21- use arrow:: array:: { ArrayRef , AsArray , BooleanArray , PrimitiveArray } ;
23+ use arrow:: array:: { ArrayRef , ArrowNativeTypeOp , AsArray , BooleanArray , PrimitiveArray } ;
2224use arrow:: buffer:: NullBuffer ;
2325use arrow:: compute;
2426use arrow:: datatypes:: ArrowPrimitiveType ;
2527use arrow:: datatypes:: DataType ;
2628use datafusion_common:: { internal_datafusion_err, DataFusionError , Result } ;
2729use datafusion_expr_common:: groups_accumulator:: { EmitTo , GroupsAccumulator } ;
2830
29- use super :: accumulate:: FlatNullState ;
31+ use crate :: aggregate:: groups_accumulator:: accumulate:: NullStateAdapter ;
32+ use crate :: aggregate:: groups_accumulator:: { ensure_room_enough_for_blocks, Block } ;
3033
3134/// An accumulator that implements a single operation over
3235/// [`ArrowPrimitiveType`] where the accumulated state is the same as
4346 T : ArrowPrimitiveType + Send ,
4447 F : Fn ( & mut T :: Native , T :: Native ) + Send + Sync ,
4548{
46- /// values per group, stored as the native type
47- values : Vec < T :: Native > ,
49+ /// Values per group, stored as the native type
50+ values : VecDeque < Vec < T :: Native > > ,
4851
4952 /// The output type (needed for Decimal precision and scale)
5053 data_type : DataType ,
@@ -53,10 +56,20 @@ where
5356 starting_value : T :: Native ,
5457
5558 /// Track nulls in the input / filters
56- null_state : FlatNullState ,
59+ null_state : NullStateAdapter ,
5760
5861 /// Function that computes the primitive result
5962 prim_fn : F ,
63+
64+ /// Block size of current `GroupAccumulator` if exist:
65+ /// - If `None`, it means block optimization is disabled,
66+ /// all `group values`` will be stored in a single `Vec`
67+ ///
68+ /// - If `Some(blk_size)`, it means block optimization is enabled,
69+ /// `group values` will be stored in multiple `Vec`s, and each
70+ /// `Vec` if of `blk_size` len, and we call it a `block`
71+ ///
72+ block_size : Option < usize > ,
6073}
6174
6275impl < T , F > PrimitiveGroupsAccumulator < T , F >
@@ -66,11 +79,12 @@ where
6679{
6780 pub fn new ( data_type : & DataType , prim_fn : F ) -> Self {
6881 Self {
69- values : vec ! [ ] ,
82+ values : VecDeque :: new ( ) ,
7083 data_type : data_type. clone ( ) ,
71- null_state : FlatNullState :: new ( ) ,
84+ null_state : NullStateAdapter :: new ( None ) ,
7285 starting_value : T :: default_value ( ) ,
7386 prim_fn,
87+ block_size : None ,
7488 }
7589 }
7690
@@ -96,17 +110,37 @@ where
96110 assert_eq ! ( values. len( ) , 1 , "single argument to update_batch" ) ;
97111 let values = values[ 0 ] . as_primitive :: < T > ( ) ;
98112
99- // update values
100- self . values . resize ( total_num_groups, self . starting_value ) ;
113+ // Expand to ensure values are large enough
114+ if let Some ( blk_size) = self . block_size {
115+ // Expand blocks in `blocked mode`
116+ let new_block = |block_size : usize | Vec :: with_capacity ( block_size) ;
117+ ensure_room_enough_for_blocks (
118+ & mut self . values ,
119+ total_num_groups,
120+ blk_size,
121+ new_block,
122+ self . starting_value ,
123+ ) ;
124+ } else {
125+ // Expand the single block in `flat mode`
126+ if self . values . is_empty ( ) {
127+ self . values . push_back ( Vec :: new ( ) ) ;
128+ }
129+
130+ self . values
131+ . back_mut ( )
132+ . unwrap ( )
133+ . resize ( total_num_groups, self . starting_value ) ;
134+ }
101135
102136 // NullState dispatches / handles tracking nulls and groups that saw no values
103137 self . null_state . accumulate (
104138 group_indices,
105139 values,
106140 opt_filter,
107141 total_num_groups,
108- |_ , group_index , new_value| {
109- let value = & mut self . values [ group_index as usize ] ;
142+ |block_id , block_offset , new_value| {
143+ let value = & mut self . values [ block_id as usize ] [ block_offset as usize ] ;
110144 ( self . prim_fn ) ( value, new_value) ;
111145 } ,
112146 ) ;
@@ -115,7 +149,7 @@ where
115149 }
116150
117151 fn evaluate ( & mut self , emit_to : EmitTo ) -> Result < ArrayRef > {
118- let values = emit_to. take_needed_rows ( & mut self . values ) ;
152+ let values = emit_to. take_needed ( & mut self . values , self . block_size . is_some ( ) ) ;
119153 let nulls = self . null_state . build ( emit_to) ;
120154 let values = PrimitiveArray :: < T > :: new ( values. into ( ) , Some ( nulls) ) // no copy
121155 . with_data_type ( self . data_type . clone ( ) ) ;
@@ -198,4 +232,28 @@ where
198232 fn size ( & self ) -> usize {
199233 self . values . capacity ( ) * size_of :: < T :: Native > ( ) + self . null_state . size ( )
200234 }
235+
236+ fn supports_blocked_groups ( & self ) -> bool {
237+ true
238+ }
239+
240+ fn alter_block_size ( & mut self , block_size : Option < usize > ) -> Result < ( ) > {
241+ self . values . clear ( ) ;
242+ self . null_state = NullStateAdapter :: new ( block_size) ;
243+ self . block_size = block_size;
244+
245+ Ok ( ( ) )
246+ }
247+ }
248+
249+ impl < N : ArrowNativeTypeOp > Block for Vec < N > {
250+ type T = N ;
251+
252+ fn len ( & self ) -> usize {
253+ self . len ( )
254+ }
255+
256+ fn fill_default_value ( & mut self , fill_len : usize , default_value : Self :: T ) {
257+ self . extend ( iter:: repeat ( default_value. clone ( ) ) . take ( fill_len) ) ;
258+ }
201259}
0 commit comments