Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,9 @@ mod native;
pub use bytes::BytesDistinctCountAccumulator;
pub use bytes::BytesViewDistinctCountAccumulator;
pub use dict::DictionaryCountAccumulator;
pub use native::Bitmap65536DistinctCountAccumulator;
pub use native::Bitmap65536DistinctCountAccumulatorI16;
pub use native::BoolArray256DistinctCountAccumulator;
pub use native::BoolArray256DistinctCountAccumulatorI8;
pub use native::FloatDistinctCountAccumulator;
pub use native::PrimitiveDistinctCountAccumulator;
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,354 @@ impl<T: ArrowPrimitiveType + Debug> Accumulator for FloatDistinctCountAccumulato
size_of_val(self) + self.values.size()
}
}

/// Optimized COUNT DISTINCT accumulator for u8 using a bool array.
/// Uses 256 bytes to track all possible u8 values.
#[derive(Debug)]
pub struct BoolArray256DistinctCountAccumulator {
seen: Box<[bool; 256]>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can probably use a BooleanBuffer from Arrow to make this signifcantly faster (I think [bool uses a byte for each booelan) 🤔

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great idea ! Let me try and experiment with that

}

impl BoolArray256DistinctCountAccumulator {
pub fn new() -> Self {
Self {
seen: Box::new([false; 256]),
}
}

#[inline]
fn count(&self) -> i64 {
self.seen.iter().filter(|&&b| b).count() as i64
}
}

impl Default for BoolArray256DistinctCountAccumulator {
fn default() -> Self {
Self::new()
}
}

impl Accumulator for BoolArray256DistinctCountAccumulator {
fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
if values.is_empty() {
return Ok(());
}

let arr = as_primitive_array::<arrow::datatypes::UInt8Type>(&values[0])?;
for value in arr.iter().flatten() {
self.seen[value as usize] = true;
}
Ok(())
}

fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
if states.is_empty() {
return Ok(());
}

let arr = as_list_array(&states[0])?;
arr.iter().try_for_each(|maybe_list| {
if let Some(list) = maybe_list {
let list = as_primitive_array::<arrow::datatypes::UInt8Type>(&list)?;
for value in list.values().iter() {
self.seen[*value as usize] = true;
}
};
Ok(())
})
}

fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
let values: Vec<u8> = self
.seen
.iter()
.enumerate()
.filter_map(|(idx, &seen)| if seen { Some(idx as u8) } else { None })
.collect();

let arr = Arc::new(
PrimitiveArray::<arrow::datatypes::UInt8Type>::from_iter_values(values),
);
Ok(vec![
SingleRowListArrayBuilder::new(arr).build_list_scalar(),
])
}

fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
Ok(ScalarValue::Int64(Some(self.count())))
}

fn size(&self) -> usize {
size_of_val(self) + 256
}
}

/// Optimized COUNT DISTINCT accumulator for i8 using a bool array.
/// Uses 256 bytes to track all possible i8 values (mapped to 0..255).
#[derive(Debug)]
pub struct BoolArray256DistinctCountAccumulatorI8 {
seen: Box<[bool; 256]>,
}

impl BoolArray256DistinctCountAccumulatorI8 {
pub fn new() -> Self {
Self {
seen: Box::new([false; 256]),
}
}

#[inline]
fn count(&self) -> i64 {
self.seen.iter().filter(|&&b| b).count() as i64
}
}

impl Default for BoolArray256DistinctCountAccumulatorI8 {
fn default() -> Self {
Self::new()
}
}

impl Accumulator for BoolArray256DistinctCountAccumulatorI8 {
fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
if values.is_empty() {
return Ok(());
}

let arr = as_primitive_array::<arrow::datatypes::Int8Type>(&values[0])?;
for value in arr.iter().flatten() {
self.seen[value as u8 as usize] = true;
}
Ok(())
}

fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
if states.is_empty() {
return Ok(());
}

let arr = as_list_array(&states[0])?;
arr.iter().try_for_each(|maybe_list| {
if let Some(list) = maybe_list {
let list = as_primitive_array::<arrow::datatypes::Int8Type>(&list)?;
for value in list.values().iter() {
self.seen[*value as u8 as usize] = true;
}
};
Ok(())
})
}

fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
let values: Vec<i8> = self
.seen
.iter()
.enumerate()
.filter_map(
|(idx, &seen)| {
if seen { Some(idx as u8 as i8) } else { None }
},
)
.collect();

let arr = Arc::new(
PrimitiveArray::<arrow::datatypes::Int8Type>::from_iter_values(values),
);
Ok(vec![
SingleRowListArrayBuilder::new(arr).build_list_scalar(),
])
}

fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
Ok(ScalarValue::Int64(Some(self.count())))
}

fn size(&self) -> usize {
size_of_val(self) + 256
}
}

/// Optimized COUNT DISTINCT accumulator for u16 using a 65536-bit bitmap.
/// Uses 8KB (1024 x u64) to track all possible u16 values.
#[derive(Debug)]
pub struct Bitmap65536DistinctCountAccumulator {
bitmap: Box<[u64; 1024]>,
}

impl Bitmap65536DistinctCountAccumulator {
pub fn new() -> Self {
Self {
bitmap: Box::new([0; 1024]),
}
}

#[inline]
fn set_bit(&mut self, value: u16) {
let word = (value / 64) as usize;
let bit = value % 64;
self.bitmap[word] |= 1u64 << bit;
}

#[inline]
fn count(&self) -> i64 {
self.bitmap.iter().map(|w| w.count_ones() as i64).sum()
}
}

impl Default for Bitmap65536DistinctCountAccumulator {
fn default() -> Self {
Self::new()
}
}

impl Accumulator for Bitmap65536DistinctCountAccumulator {
fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
if values.is_empty() {
return Ok(());
}

let arr = as_primitive_array::<arrow::datatypes::UInt16Type>(&values[0])?;
for value in arr.iter().flatten() {
self.set_bit(value);
}
Ok(())
}

fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
if states.is_empty() {
return Ok(());
}

let arr = as_list_array(&states[0])?;
arr.iter().try_for_each(|maybe_list| {
if let Some(list) = maybe_list {
let list = as_primitive_array::<arrow::datatypes::UInt16Type>(&list)?;
for value in list.values().iter() {
self.set_bit(*value);
}
};
Ok(())
})
}

fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
let mut values = Vec::new();
for (word_idx, &word) in self.bitmap.iter().enumerate() {
if word != 0 {
for bit in 0..64 {
if (word & (1u64 << bit)) != 0 {
values.push((word_idx as u16) * 64 + bit);
}
}
}
}

let arr = Arc::new(
PrimitiveArray::<arrow::datatypes::UInt16Type>::from_iter_values(values),
);
Ok(vec![
SingleRowListArrayBuilder::new(arr).build_list_scalar(),
])
}

fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
Ok(ScalarValue::Int64(Some(self.count())))
}

fn size(&self) -> usize {
size_of_val(self) + 8192
}
}

/// Optimized COUNT DISTINCT accumulator for i16 using a 65536-bit bitmap.
/// Uses 8KB (1024 x u64) to track all possible i16 values (mapped to 0..65535).
#[derive(Debug)]
pub struct Bitmap65536DistinctCountAccumulatorI16 {
bitmap: Box<[u64; 1024]>,
}

impl Bitmap65536DistinctCountAccumulatorI16 {
pub fn new() -> Self {
Self {
bitmap: Box::new([0; 1024]),
}
}

#[inline]
fn set_bit(&mut self, value: i16) {
let idx = value as u16;
let word = (idx / 64) as usize;
let bit = idx % 64;
self.bitmap[word] |= 1u64 << bit;
}

#[inline]
fn count(&self) -> i64 {
self.bitmap.iter().map(|w| w.count_ones() as i64).sum()
}
}

impl Default for Bitmap65536DistinctCountAccumulatorI16 {
fn default() -> Self {
Self::new()
}
}

impl Accumulator for Bitmap65536DistinctCountAccumulatorI16 {
fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
if values.is_empty() {
return Ok(());
}

let arr = as_primitive_array::<arrow::datatypes::Int16Type>(&values[0])?;
for value in arr.iter().flatten() {
self.set_bit(value);
}
Ok(())
}

fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
if states.is_empty() {
return Ok(());
}

let arr = as_list_array(&states[0])?;
arr.iter().try_for_each(|maybe_list| {
if let Some(list) = maybe_list {
let list = as_primitive_array::<arrow::datatypes::Int16Type>(&list)?;
for value in list.values().iter() {
self.set_bit(*value);
}
};
Ok(())
})
}

fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
let mut values = Vec::new();
for (word_idx, &word) in self.bitmap.iter().enumerate() {
if word != 0 {
for bit in 0..64 {
if (word & (1u64 << bit)) != 0 {
values.push(((word_idx as u16) * 64 + bit) as i16);
}
}
}
}

let arr = Arc::new(
PrimitiveArray::<arrow::datatypes::Int16Type>::from_iter_values(values),
);
Ok(vec![
SingleRowListArrayBuilder::new(arr).build_list_scalar(),
])
}

fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
Ok(ScalarValue::Int64(Some(self.count())))
}

fn size(&self) -> usize {
size_of_val(self) + 8192
}
}
2 changes: 1 addition & 1 deletion datafusion/functions-aggregate/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ name = "approx_distinct"
harness = false

[[bench]]
name = "first_last"
name = "count_distinct"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please add this benchmark as a separate PR (so we can use our standard benchmark runner to confirm the results)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats sounds like a good idea . Let me put up a PR to add benchmarks to count_distinct , approx_distinct on a separate PR

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raised : #21521

harness = false

[[bench]]
Expand Down
Loading
Loading