Skip to content

Commit 1960084

Browse files
committed
update EmitTo's comments and fix fmt.
1 parent b1c7656 commit 1960084

12 files changed

Lines changed: 100 additions & 51 deletions

File tree

datafusion-examples/examples/advanced_udaf.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ use datafusion::error::Result;
3131
use datafusion::prelude::*;
3232
use datafusion_common::{cast::as_float64_array, DataFusionError, ScalarValue};
3333
use datafusion_expr::{
34-
function::{AccumulatorArgs, StateFieldsArgs}, groups_accumulator::GroupIndices, Accumulator, AggregateUDF, AggregateUDFImpl, GroupsAccumulator, Signature
34+
function::{AccumulatorArgs, StateFieldsArgs},
35+
groups_accumulator::GroupIndices,
36+
Accumulator, AggregateUDF, AggregateUDFImpl, GroupsAccumulator, Signature,
3537
};
3638

3739
/// This example shows how to use the full AggregateUDFImpl API to implement a user
@@ -279,9 +281,11 @@ impl GroupsAccumulator for GeometricMeanGroupsAccumulator {
279281

280282
let group_indices = match group_indices {
281283
GroupIndices::Flat(idxs) => idxs,
282-
GroupIndices::Blocked(_) => return Err(DataFusionError::NotImplemented(
283-
"blocked states management is not supported".to_string()),
284-
),
284+
GroupIndices::Blocked(_) => {
285+
return Err(DataFusionError::NotImplemented(
286+
"blocked states management is not supported".to_string(),
287+
))
288+
}
285289
};
286290

287291
let values = values[0].as_primitive::<Float64Type>();
@@ -318,9 +322,11 @@ impl GroupsAccumulator for GeometricMeanGroupsAccumulator {
318322

319323
let group_indices = match group_indices {
320324
GroupIndices::Flat(idxs) => idxs,
321-
GroupIndices::Blocked(_) => return Err(DataFusionError::NotImplemented(
322-
"blocked states management is not supported".to_string()),
323-
),
325+
GroupIndices::Blocked(_) => {
326+
return Err(DataFusionError::NotImplemented(
327+
"blocked states management is not supported".to_string(),
328+
))
329+
}
324330
};
325331

326332
// first batch is counts, second is partial sums

datafusion/expr-common/src/groups_accumulator.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,10 @@ pub enum EmitTo {
3333
First(usize),
3434
/// Emit all groups managed by blocks
3535
AllBlocks,
36-
/// Try to emit only the first `n` groups similar as `First`.
37-
/// But `n` will be aligned to block size, and finally serval blocks will be returned.
36+
/// Emit only the first `n` group blocks,
37+
/// similar as `First`, but used in blocked `GroupValues` and `GroupAccumulator`.
3838
///
39-
/// For example, `n= 10`, `block size=4`, `n` will be aligned to 12,
40-
/// and finally 3 blocks will be returned.
39+
/// For example, `n=3`, `block size=4`, finally 12 groups will be returned.
4140
FirstBlocks(usize),
4241
}
4342

datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,10 @@ use datafusion_common::{
3131
arrow_datafusion_err, utils::get_arrayref_at_indices, DataFusionError, Result,
3232
ScalarValue,
3333
};
34-
use datafusion_expr_common::{accumulator::Accumulator, groups_accumulator::GroupIndices};
3534
use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator};
35+
use datafusion_expr_common::{
36+
accumulator::Accumulator, groups_accumulator::GroupIndices,
37+
};
3638

3739
/// An adapter that implements [`GroupsAccumulator`] for any [`Accumulator`]
3840
///
@@ -259,9 +261,11 @@ impl GroupsAccumulator for GroupsAccumulatorAdapter {
259261
) -> Result<()> {
260262
let group_indices = match group_indices {
261263
GroupIndices::Flat(idxs) => idxs,
262-
GroupIndices::Blocked(_) => return Err(DataFusionError::NotImplemented(
263-
"blocked states management is not supported".to_string()),
264-
),
264+
GroupIndices::Blocked(_) => {
265+
return Err(DataFusionError::NotImplemented(
266+
"blocked states management is not supported".to_string(),
267+
))
268+
}
265269
};
266270

267271
self.invoke_per_accumulator(
@@ -340,9 +344,11 @@ impl GroupsAccumulator for GroupsAccumulatorAdapter {
340344
) -> Result<()> {
341345
let group_indices = match group_indices {
342346
GroupIndices::Flat(idxs) => idxs,
343-
GroupIndices::Blocked(_) => return Err(DataFusionError::NotImplemented(
344-
"blocked states management is not supported".to_string()),
345-
),
347+
GroupIndices::Blocked(_) => {
348+
return Err(DataFusionError::NotImplemented(
349+
"blocked states management is not supported".to_string(),
350+
))
351+
}
346352
};
347353

348354
self.invoke_per_accumulator(

datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -665,7 +665,10 @@ mod test {
665665
let mut accumulated_values = vec![];
666666
let mut null_state = NullState::new();
667667

668-
let group_indices = group_indices.iter().map(|idx| *idx as u64).collect::<Vec<_>>();
668+
let group_indices = group_indices
669+
.iter()
670+
.map(|idx| *idx as u64)
671+
.collect::<Vec<_>>();
669672
null_state.accumulate(
670673
&group_indices,
671674
values,
@@ -728,7 +731,10 @@ mod test {
728731
) {
729732
let mut accumulated_values = vec![];
730733

731-
let group_indices = group_indices.iter().map(|idx| *idx as u64).collect::<Vec<_>>();
734+
let group_indices = group_indices
735+
.iter()
736+
.map(|idx| *idx as u64)
737+
.collect::<Vec<_>>();
732738
accumulate_indices(&group_indices, nulls, opt_filter, |group_index| {
733739
accumulated_values.push(group_index);
734740
});
@@ -783,7 +789,10 @@ mod test {
783789
let mut accumulated_values = vec![];
784790
let mut null_state = NullState::new();
785791

786-
let group_indices = group_indices.iter().map(|idx| *idx as u64).collect::<Vec<_>>();
792+
let group_indices = group_indices
793+
.iter()
794+
.map(|idx| *idx as u64)
795+
.collect::<Vec<_>>();
787796
null_state.accumulate_boolean(
788797
&group_indices,
789798
values,

datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ use std::sync::Arc;
2020
use arrow::array::{ArrayRef, AsArray, BooleanArray, BooleanBufferBuilder};
2121
use arrow::buffer::BooleanBuffer;
2222
use datafusion_common::{DataFusionError, Result};
23-
use datafusion_expr_common::groups_accumulator::{EmitTo, GroupIndices, GroupsAccumulator};
23+
use datafusion_expr_common::groups_accumulator::{
24+
EmitTo, GroupIndices, GroupsAccumulator,
25+
};
2426

2527
use super::accumulate::NullState;
2628

@@ -73,12 +75,14 @@ where
7375
total_num_groups: usize,
7476
) -> Result<()> {
7577
assert_eq!(values.len(), 1, "single argument to update_batch");
76-
78+
7779
let group_indices = match group_indices {
7880
GroupIndices::Flat(idxs) => idxs,
79-
GroupIndices::Blocked(_) => return Err(DataFusionError::NotImplemented(
80-
"blocked states management is not supported".to_string()),
81-
),
81+
GroupIndices::Blocked(_) => {
82+
return Err(DataFusionError::NotImplemented(
83+
"blocked states management is not supported".to_string(),
84+
))
85+
}
8286
};
8387

8488
let values = values[0].as_boolean();
@@ -137,7 +141,7 @@ where
137141
fn merge_batch(
138142
&mut self,
139143
values: &[ArrayRef],
140-
group_indices:GroupIndices<'_>,
144+
group_indices: GroupIndices<'_>,
141145
opt_filter: Option<&BooleanArray>,
142146
total_num_groups: usize,
143147
) -> Result<()> {

datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ use arrow::compute;
2323
use arrow::datatypes::ArrowPrimitiveType;
2424
use arrow::datatypes::DataType;
2525
use datafusion_common::{internal_datafusion_err, DataFusionError, Result};
26-
use datafusion_expr_common::groups_accumulator::{EmitTo, GroupIndices, GroupsAccumulator};
26+
use datafusion_expr_common::groups_accumulator::{
27+
EmitTo, GroupIndices, GroupsAccumulator,
28+
};
2729

2830
use super::accumulate::NullState;
2931

@@ -96,9 +98,11 @@ where
9698

9799
let group_indices = match group_indices {
98100
GroupIndices::Flat(idxs) => idxs,
99-
GroupIndices::Blocked(_) => return Err(DataFusionError::NotImplemented(
100-
"blocked states management is not supported".to_string()),
101-
),
101+
GroupIndices::Blocked(_) => {
102+
return Err(DataFusionError::NotImplemented(
103+
"blocked states management is not supported".to_string(),
104+
))
105+
}
102106
};
103107

104108
let values = values[0].as_primitive::<T>();
@@ -137,7 +141,7 @@ where
137141
fn merge_batch(
138142
&mut self,
139143
values: &[ArrayRef],
140-
group_indices:GroupIndices<'_>,
144+
group_indices: GroupIndices<'_>,
141145
opt_filter: Option<&BooleanArray>,
142146
total_num_groups: usize,
143147
) -> Result<()> {

datafusion/physical-plan/src/aggregates/group_values/bytes.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,17 +46,16 @@ impl<O: OffsetSizeTrait> GroupValues for GroupValuesByes<O> {
4646
&mut self,
4747
cols: &[ArrayRef],
4848
groups: &mut Vec<u64>,
49-
group_type: GroupIndicesType
49+
group_type: GroupIndicesType,
5050
) -> datafusion_common::Result<()> {
5151
assert_eq!(cols.len(), 1);
52-
52+
5353
if group_type == GroupIndicesType::Blocked {
5454
return Err(DataFusionError::NotImplemented(
55-
"blocked group values management is not supported".to_string()),
56-
);
55+
"blocked group values management is not supported".to_string(),
56+
));
5757
}
5858

59-
6059
// look up / add entries in the table
6160
let arr = &cols[0];
6261

@@ -118,7 +117,11 @@ impl<O: OffsetSizeTrait> GroupValues for GroupValuesByes<O> {
118117
self.num_groups = 0;
119118
let mut group_indexes = vec![];
120119
// FIXME: When impl blocked GroupValuesByes, we should consider a way to get right `group_type` here.
121-
self.intern(&[remaining_group_values], &mut group_indexes, GroupIndicesType::Flat)?;
120+
self.intern(
121+
&[remaining_group_values],
122+
&mut group_indexes,
123+
GroupIndicesType::Flat,
124+
)?;
122125

123126
// Verify that the group indexes were assigned in the correct order
124127
assert_eq!(0, group_indexes[0]);

datafusion/physical-plan/src/aggregates/group_values/bytes_view.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ impl GroupValues for GroupValuesBytesView {
5353

5454
if group_type == GroupIndicesType::Blocked {
5555
return Err(DataFusionError::NotImplemented(
56-
"blocked group values management is not supported".to_string()),
57-
);
56+
"blocked group values management is not supported".to_string(),
57+
));
5858
}
5959

6060
// look up / add entries in the table
@@ -118,7 +118,11 @@ impl GroupValues for GroupValuesBytesView {
118118
self.num_groups = 0;
119119
let mut group_indexes = vec![];
120120
// FIXME: When impl blocked GroupValuesBytesView, we should consider a way to get right `group_type` here.
121-
self.intern(&[remaining_group_values], &mut group_indexes, GroupIndicesType::Flat)?;
121+
self.intern(
122+
&[remaining_group_values],
123+
&mut group_indexes,
124+
GroupIndicesType::Flat,
125+
)?;
122126

123127
// Verify that the group indexes were assigned in the correct order
124128
assert_eq!(0, group_indexes[0]);

datafusion/physical-plan/src/aggregates/group_values/mod.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,12 @@ use datafusion_physical_expr::binary_map::OutputType;
3636
/// An interning store for group keys
3737
pub trait GroupValues: Send {
3838
/// Calculates the `groups` for each input row of `cols`
39-
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<u64>, group_type: GroupIndicesType) -> Result<()>;
39+
fn intern(
40+
&mut self,
41+
cols: &[ArrayRef],
42+
groups: &mut Vec<u64>,
43+
group_type: GroupIndicesType,
44+
) -> Result<()>;
4045

4146
/// Returns the number of bytes used by this [`GroupValues`]
4247
fn size(&self) -> usize;

datafusion/physical-plan/src/aggregates/group_values/primitive.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,8 @@ where
122122

123123
if group_type == GroupIndicesType::Blocked {
124124
return Err(DataFusionError::NotImplemented(
125-
"blocked group values management is not supported".to_string()),
126-
);
125+
"blocked group values management is not supported".to_string(),
126+
));
127127
}
128128

129129
groups.clear();

0 commit comments

Comments
 (0)