Skip to content

Commit 808e8d7

Browse files
committed
support blocked mode for GroupValuesPrimitive.
1 parent 2cf18f4 commit 808e8d7

2 files changed

Lines changed: 141 additions & 27 deletions

File tree

datafusion/expr-common/src/groups_accumulator.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use datafusion_common::{not_impl_err, DataFusionError, Result};
2525
/// Describes how many rows should be emitted during grouping.
2626
#[derive(Debug, Clone, Copy)]
2727
pub enum EmitTo {
28-
/// Emit all groups
28+
/// Emit all groups, will clear all existing group indexes
2929
All,
3030
/// Emit only the first `n` groups and shift all existing group
3131
/// indexes down by `n`.
@@ -35,11 +35,7 @@ pub enum EmitTo {
3535
First(usize),
3636
/// Emit next block in the blocked managed groups
3737
///
38-
/// The flag's meaning:
39-
/// - `true` represents new groups still will be added,
40-
/// and we need to shift the values down.
41-
/// - `false` represents no new groups will be added again,
42-
/// and we don't need to shift the values down.
38+
/// Similar as `Emit::All`, will also clear all existing group indexes
4339
NextBlock(bool),
4440
}
4541

datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs

Lines changed: 139 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,9 @@ where
136136
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()> {
137137
if let Some(block_size) = self.block_size {
138138
let before_add_group = |group_values: &mut VecDeque<Vec<T::Native>>| {
139-
if group_values.back().unwrap().len() == block_size {
139+
if group_values.is_empty()
140+
|| group_values.back().unwrap().len() == block_size
141+
{
140142
let new_block = Vec::with_capacity(block_size);
141143
group_values.push_back(new_block);
142144
}
@@ -156,15 +158,20 @@ where
156158
}
157159

158160
fn size(&self) -> usize {
159-
// self.map.capacity() * size_of::<usize>() + self.values.len
161+
self.map.capacity() * size_of::<usize>()
162+
+ self
163+
.values
164+
.iter()
165+
.map(|blk| blk.len() * blk.allocated_size())
166+
.sum::<usize>()
160167
}
161168

162169
fn is_empty(&self) -> bool {
163-
self.values.is_empty()
170+
self.len() == 0
164171
}
165172

166173
fn len(&self) -> usize {
167-
self.values.len()
174+
self.map.len() + self.null_group.map(|_| 1).unwrap_or_default()
168175
}
169176

170177
fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
@@ -184,24 +191,29 @@ where
184191
}
185192

186193
let array: PrimitiveArray<T> = match emit_to {
194+
// ===============================================
195+
// Emitting in flat mode
196+
// ===============================================
187197
EmitTo::All => {
188198
assert!(
189199
self.block_size.is_none(),
190-
"only support EmitTo::All in flat group values"
200+
"only support EmitTo::All in flat mode"
191201
);
192202

193203
self.map.clear();
194204
build_primitive(
195205
std::mem::take(self.values.back_mut().unwrap()),
196-
self.null_group.take(),
206+
self.null_group.take().map(|idx| idx as usize),
197207
)
198208
}
209+
199210
EmitTo::First(n) => {
200211
assert!(
201212
self.block_size.is_none(),
202-
"only support EmitTo::First in flat group values"
213+
"only support EmitTo::First in flat mode"
203214
);
204215

216+
let n = n as u64;
205217
self.map.retain(|group_idx| {
206218
// Decrement group index by n
207219
match group_idx.checked_sub(n) {
@@ -214,6 +226,7 @@ where
214226
None => false,
215227
}
216228
});
229+
217230
let null_group = match &mut self.null_group {
218231
Some(v) if *v >= n => {
219232
*v -= n;
@@ -224,12 +237,107 @@ where
224237
};
225238

226239
let single_block = self.values.back_mut().unwrap();
227-
let mut split = single_block.split_off(n);
240+
let mut split = single_block.split_off(n as usize);
228241
std::mem::swap(single_block, &mut split);
229-
build_primitive(split, null_group)
242+
build_primitive(split, null_group.map(|idx| idx as usize))
243+
}
244+
245+
// ===============================================
246+
// Emitting in blocked mode
247+
// ===============================================
248+
// TODO: we should consider if it is necessary to support indices modifying
249+
// in `EmitTo::NextBlock`. It is only used in spilling case, maybe we can
250+
// always emit all in blocked mode. So, we just need to clear the map rather
251+
// than doing expansive modification for each buck in it.
252+
EmitTo::NextBlock(true) => {
253+
assert!(
254+
self.block_size.is_some(),
255+
"only support EmitTo::Next in blocked group values"
256+
);
257+
258+
// We only emit the first block(`block_id == 0`),
259+
// so erase the entries with `block_id == 0`, and decrease entries with `block_id > 0`
260+
self.map.retain(|packed_idx| {
261+
let old_blk_id =
262+
BlockedGroupIndexOperations::get_block_id(*packed_idx);
263+
match old_blk_id.checked_sub(1) {
264+
// `block_id > 0`, shift `block_id` down
265+
Some(new_blk_id) => {
266+
let blk_offset =
267+
BlockedGroupIndexOperations::get_block_offset(
268+
*packed_idx,
269+
);
270+
let new_packed_idx = BlockedGroupIndexOperations::pack_index(
271+
new_blk_id as u32,
272+
blk_offset,
273+
);
274+
*packed_idx = new_packed_idx;
275+
276+
true
277+
}
278+
279+
// `block_id == 0`, so remove from table
280+
None => false,
281+
}
282+
});
283+
284+
// Similar as `non-nulls`, if `block_id > 0` we decrease, and if `block_id == 0` we erase
285+
let null_block_pair_opt = self.null_group.map(|packed_idx| {
286+
(
287+
BlockedGroupIndexOperations::get_block_id(packed_idx),
288+
BlockedGroupIndexOperations::get_block_offset(packed_idx),
289+
)
290+
});
291+
let null_idx = match null_block_pair_opt {
292+
Some((blk_id, blk_offset)) if blk_id > 0 => {
293+
let new_blk_id = blk_id - 1;
294+
let new_packed_idx = BlockedGroupIndexOperations::pack_index(
295+
new_blk_id, blk_offset,
296+
);
297+
self.null_group = Some(new_packed_idx);
298+
None
299+
}
300+
Some((_, blk_offset)) => {
301+
self.null_group = None;
302+
Some(blk_offset as usize)
303+
}
304+
None => None,
305+
};
306+
307+
let emit_blk = self.values.pop_front().unwrap();
308+
build_primitive(emit_blk, null_idx)
230309
}
231-
EmitTo::NextBlock(_) => {
232-
310+
311+
EmitTo::NextBlock(false) => {
312+
assert!(
313+
self.block_size.is_some(),
314+
"only support EmitTo::Next in blocked group values"
315+
);
316+
317+
let null_block_pair_opt = self.null_group.map(|packed_idx| {
318+
(
319+
BlockedGroupIndexOperations::get_block_id(packed_idx),
320+
BlockedGroupIndexOperations::get_block_offset(packed_idx),
321+
)
322+
});
323+
let null_idx = match null_block_pair_opt {
324+
Some((blk_id, blk_offset)) if blk_id > 0 => {
325+
let new_blk_id = blk_id - 1;
326+
let new_packed_idx = BlockedGroupIndexOperations::pack_index(
327+
new_blk_id, blk_offset,
328+
);
329+
self.null_group = Some(new_packed_idx);
330+
None
331+
}
332+
Some((_, blk_offset)) => {
333+
self.null_group = None;
334+
Some(blk_offset as usize)
335+
}
336+
None => None,
337+
};
338+
339+
let emit_blk = self.values.pop_front().unwrap();
340+
build_primitive(emit_blk, null_idx)
233341
}
234342
};
235343

@@ -238,8 +346,16 @@ where
238346

239347
fn clear_shrink(&mut self, batch: &RecordBatch) {
240348
let count = batch.num_rows();
241-
self.values.clear();
242-
self.values.shrink_to(count);
349+
350+
// TODO: Only reserve room of values in `flat mode` currently,
351+
// we may need to consider it again when supporting spilling
352+
// for `blocked mode`.
353+
if self.block_size.is_none() {
354+
let single_block = self.values.back_mut().unwrap();
355+
single_block.clear();
356+
single_block.shrink_to(count);
357+
}
358+
243359
self.map.clear();
244360
self.map.shrink_to(count, |_| 0); // hasher does not matter since the map is cleared
245361
}
@@ -265,16 +381,17 @@ where
265381
for v in cols[0].as_primitive::<T>() {
266382
let group_index = match v {
267383
None => *self.null_group.get_or_insert_with(|| {
268-
// actions before add new group like checking if room is enough
384+
// Actions before add new group like checking if room is enough
269385
before_add_group(&mut self.values);
270386

271-
// get block infos and update block
272-
let block_id = self.values.len() as u32;
387+
// Get block infos and update block,
388+
// we need `current block` and `next offset in block`
389+
let block_id = self.values.len() as u32 - 1;
273390
let current_block = self.values.back_mut().unwrap();
274391
let block_offset = current_block.len() as u64;
275392
current_block.push(Default::default());
276393

277-
// get group index and finish actions needed it
394+
// Get group index and finish actions needed it
278395
O::pack_index(block_id, block_offset)
279396
}),
280397
Some(key) => {
@@ -305,16 +422,17 @@ where
305422
match insert {
306423
hashbrown::hash_table::Entry::Occupied(o) => *o.get(),
307424
hashbrown::hash_table::Entry::Vacant(v) => {
308-
// actions before add new group like checking if room is enough
425+
// Actions before add new group like checking if room is enough
309426
before_add_group(&mut self.values);
310427

311-
// get block infos and update block
312-
let block_id = self.values.len() as u32;
428+
// Get block infos and update block,
429+
// we need `current block` and `next offset in block`
430+
let block_id = self.values.len() as u32 - 1;
313431
let current_block = self.values.back_mut().unwrap();
314432
let block_offset = current_block.len() as u64;
315433
current_block.push(key);
316434

317-
// get group index and finish actions needed it
435+
// Get group index and finish actions needed it
318436
let packed_index = O::pack_index(block_id, block_offset);
319437
v.insert(packed_index);
320438
packed_index

0 commit comments

Comments
 (0)