diff --git a/src/query/functions/src/aggregates/aggregate_quantile_tdigest.rs b/src/query/functions/src/aggregates/aggregate_quantile_tdigest.rs index 54a231b4d76c9..84ecd741e810c 100644 --- a/src/query/functions/src/aggregates/aggregate_quantile_tdigest.rs +++ b/src/query/functions/src/aggregates/aggregate_quantile_tdigest.rs @@ -37,7 +37,6 @@ use databend_common_expression::types::number::*; use databend_common_expression::types::*; use databend_common_expression::with_decimal_mapped_type; use databend_common_expression::with_number_mapped_type; -use itertools::Itertools; use super::AggrState; use super::AggrStateLoc; @@ -55,59 +54,63 @@ pub(crate) const MEDIAN: u8 = 0; pub(crate) const QUANTILE: u8 = 1; #[derive(BorshSerialize, BorshDeserialize)] -pub(crate) struct QuantileTDigestState { - epsilon: u32, - max_centroids: usize, +pub(crate) enum QuantileTDigestState { + Normal(TDigestData), + Nan, +} +#[derive(BorshSerialize, BorshDeserialize)] +pub(crate) struct TDigestData { total_weight: f64, - weights: Vec, - means: Vec, + centroids: Vec, unmerged_total_weight: f64, - unmerged_weights: Vec, - unmerged_means: Vec, + unmerged: Vec, min: f64, max: f64, } +#[derive(Clone, Copy, BorshSerialize, BorshDeserialize)] +struct Centroid { + mean: f64, + weight: f64, +} + impl QuantileTDigestState { pub(crate) fn new() -> Self { - Self { - epsilon: 100u32, - max_centroids: 2048, - total_weight: 0f64, - weights: vec![], - means: vec![], - unmerged_total_weight: 0f64, - unmerged_weights: vec![], - unmerged_means: vec![], - min: 0f64, - max: 0f64, - } + Self::Normal(TDigestData { + total_weight: 0.0, + centroids: vec![], + unmerged_total_weight: 0.0, + unmerged: vec![], + min: f64::INFINITY, + max: f64::NEG_INFINITY, + }) } pub(crate) fn add(&mut self, other: f64, weight: Option) { - if self.unmerged_weights.len() + self.weights.len() >= self.max_centroids - 1 { - self.compress(); + let weight = weight.unwrap_or(1) as f64; + if weight == 0.0 { + return; + } + if other.is_nan() { + *self = Self::Nan; + return; } - self.unmerged_weights.push(weight.unwrap_or(1) as f64); - self.unmerged_means.push(other); - self.unmerged_total_weight += 1f64; + let Self::Normal(state) = self else { + return; + }; + state.add_finite(other, weight); } pub(crate) fn merge(&mut self, rhs: &mut Self) -> Result<()> { - if rhs.len() == 0 { - return Ok(()); + match (&mut *self, rhs) { + (Self::Nan, _) | (_, Self::Nan) => { + *self = Self::Nan; + } + (Self::Normal(state), Self::Normal(rhs)) => state.merge(rhs)?, } - - rhs.compress(); - - self.unmerged_weights.extend_from_slice(&rhs.weights); - self.unmerged_means.extend_from_slice(&rhs.means); - self.unmerged_total_weight = rhs.weights.iter().sum(); - self.compress(); - Ok(()) } @@ -135,74 +138,112 @@ impl QuantileTDigestState { } pub(crate) fn quantile(&mut self, level: f64) -> f64 { - self.compress(); - if self.weights.is_empty() { - return 0f64; - } else if self.weights.len() == 1 { - return self.means[0]; + match self { + Self::Normal(state) => state.quantile(level), + Self::Nan => f64::NAN, } + } +} - let mean_last = self.means.len() - 1; - let weight_last = self.weights.len() - 1; +impl TDigestData { + const EPSILON: f64 = 100.0; + const MAX_CENTROIDS: usize = 2048; + + fn add_finite(&mut self, other: f64, weight: f64) { + if self.unmerged.len() + self.centroids.len() >= Self::MAX_CENTROIDS - 1 { + self.compress(); + } + + self.unmerged.push(Centroid { + mean: other, + weight, + }); + self.unmerged_total_weight += weight; + } + + fn merge(&mut self, rhs: &mut Self) -> Result<()> { + if rhs.len() == 0 { + return Ok(()); + } + + rhs.compress(); + + self.unmerged.extend_from_slice(&rhs.centroids); + self.unmerged_total_weight += rhs + .centroids + .iter() + .map(|centroid| centroid.weight) + .sum::(); + self.compress(); + + Ok(()) + } + + fn quantile(&mut self, level: f64) -> f64 { + self.compress(); + let (first, last) = match self.centroids.as_slice() { + [] => return 0.0, + [Centroid { mean, .. }] => return *mean, + [first, .., last] => (*first, *last), + }; let index = level * self.total_weight; - if index < 1f64 { + if index < 1.0 { return self.min; } - if self.weights[0] > 1f64 && index < self.weights[0] / 2f64 { - return self.min - + (index - 1f64) / (self.weights[0] / 2f64 - 1f64) * (self.means[0] - self.min); + if first.weight > 1.0 && index < first.weight / 2.0 { + return self.min + (index - 1.0) / (first.weight / 2.0 - 1.0) * (first.mean - self.min); } - if index > self.total_weight - 1f64 { + if index > self.total_weight - 1.0 { return self.max; } - if self.weights[weight_last] > 1f64 - && self.total_weight - index <= self.weights[weight_last] / 2f64 - { + if last.weight > 1.0 && self.total_weight - index <= last.weight / 2.0 { + if last.weight / 2.0 <= 1.0 { + return self.max; + } return self.max - - (self.total_weight - index - 1f64) / (self.weights[weight_last] / 2f64 - 1f64) - * (self.max - self.means[mean_last]); + - (self.total_weight - index - 1.0) / (last.weight / 2.0 - 1.0) + * (self.max - last.mean); } - let mut weight_so_far = self.weights[0] / 2f64; - for i in 0..(self.weights.len() - 1) { - let dw = (self.weights[i] + self.weights[i + 1]) / 2f64; + let mut weight_so_far = first.weight / 2.0; + for (left, right) in self + .centroids + .windows(2) + .map(|centroids| (centroids[0], centroids[1])) + { + let dw = (left.weight + right.weight) / 2.0; if weight_so_far + dw > index { - let mut left_unit = 0f64; - if self.weights[i] == 1f64 { + let mut left_unit = 0.0; + if left.weight == 1.0 { if index - weight_so_far < 0.5 { - return self.means[i]; + return left.mean; } left_unit = 0.5; } - let mut right_unit = 0f64; - if self.weights[i + 1] == 1f64 { + let mut right_unit = 0.0; + if right.weight == 1.0 { if weight_so_far + dw - index <= 0.5 { - return self.means[i + 1]; + return right.mean; } right_unit = 0.5; } let z1 = index - weight_so_far - left_unit; let z2 = weight_so_far + dw - index - right_unit; - return QuantileTDigestState::weighted_average( - self.means[i], - z2, - self.means[i + 1], - z1, - ); + return Self::weighted_average(left.mean, z2, right.mean, z1); } weight_so_far += dw; } debug_assert!(index <= self.total_weight); - debug_assert!(index >= self.total_weight - self.weights[weight_last] / 2f64); + debug_assert!(index >= self.total_weight - last.weight / 2.0); - let z1 = index - self.total_weight - self.weights[weight_last] / 2f64; - let z2 = self.weights[weight_last] / 2f64 - z1; + let z1 = index - self.total_weight - last.weight / 2.0; + let z2 = last.weight / 2.0 - z1; - QuantileTDigestState::weighted_average(self.means[mean_last], z1, self.max, z2) + Self::weighted_average(last.mean, z1, self.max, z2) } fn len(&self) -> usize { @@ -218,65 +259,49 @@ impl QuantileTDigestState { } fn compress(&mut self) { - if self.unmerged_total_weight > 0f64 { - self.merge_centroid(self.unmerged_weights.clone(), self.unmerged_means.clone()); - self.unmerged_weights.clear(); - self.unmerged_means.clear(); - self.unmerged_total_weight = 0f64; + if self.unmerged_total_weight > 0.0 { + self.merge_centroid(self.unmerged.clone()); + self.unmerged.clear(); + self.unmerged_total_weight = 0.0; } } - fn merge_centroid(&mut self, incoming_weights: Vec, incoming_means: Vec) { - let mut incoming_weights = incoming_weights; - incoming_weights.extend_from_slice(&self.weights); - let mut incoming_means = incoming_means; - incoming_means.extend_from_slice(&self.means); - - // sort (0..incoming_means.len()) by values in incoming_means. - // e.g. incoming_means[5.0, 2.0, 9.1, 1.3] => [3, 1, 0, 2] - let incoming_order = (0..incoming_means.len()) - .sorted_by(|&i, &j| incoming_means[i].partial_cmp(&incoming_means[j]).unwrap()) - .collect::>(); + fn merge_centroid(&mut self, mut incoming: Vec) { + incoming.extend_from_slice(&self.centroids); + incoming.sort_by(|a, b| a.mean.total_cmp(&b.mean)); self.total_weight += self.unmerged_total_weight; - let normalizer = self.epsilon as f64 / (PI * self.total_weight); - - let mut weights = vec![]; - let mut means = vec![]; - - weights.push(incoming_weights[incoming_order[0]]); - means.push(incoming_means[incoming_order[0]]); + let normalizer = Self::EPSILON / (PI * self.total_weight); - let mut weight_so_far = 0f64; + let mut incoming = incoming.into_iter(); + let mut current = incoming.next().unwrap(); + let first_mean = current.mean; + let mut centroids = vec![]; + let mut weight_so_far = 0.0; - for idx in &incoming_order[1..] { - let idx = *idx; - let proposed_weight = weights[weights.len() - 1] + incoming_weights[idx]; + for centroid in incoming { + let proposed_weight = current.weight + centroid.weight; let z = normalizer * proposed_weight; let q0 = weight_so_far / self.total_weight; let q2 = (weight_so_far + proposed_weight) / self.total_weight; - let weight_last = weights.len() - 1; - let mean_last = means.len() - 1; - if z * z <= q0 * (1f64 - q0) && z * z <= q2 * (1f64 - q2) { - weights[weight_last] += incoming_weights[idx]; - means[mean_last] = means[mean_last] - + (incoming_means[idx] - means[mean_last]) * incoming_weights[idx] - / weights[weight_last]; + if z * z <= q0 * (1.0 - q0) && z * z <= q2 * (1.0 - q2) { + current.weight = proposed_weight; + current.mean += (centroid.mean - current.mean) * centroid.weight / current.weight; } else { - weight_so_far += weights[weight_last]; - weights.push(incoming_weights[idx]); - means.push(incoming_means[idx]); + weight_so_far += current.weight; + centroids.push(current); + current = centroid; } } - if self.total_weight > 0f64 { - self.min = f64::min(self.min, means[0]); - self.max = f64::max(self.max, means[means.len() - 1]); + if self.total_weight > 0.0 { + self.min = f64::min(self.min, first_mean); + self.max = f64::max(self.max, current.mean); } - self.weights = weights; - self.means = means; + centroids.push(current); + self.centroids = centroids; } } diff --git a/src/query/functions/tests/it/aggregates/agg.rs b/src/query/functions/tests/it/aggregates/agg.rs index b7205ece5ba5a..1b7317a46fcf2 100644 --- a/src/query/functions/tests/it/aggregates/agg.rs +++ b/src/query/functions/tests/it/aggregates/agg.rs @@ -108,11 +108,7 @@ fn test_aggr_functions() { test_agg_quantile_disc(file, eval_aggr); test_agg_quantile_cont(file, eval_aggr); test_agg_quantile_tdigest(file, eval_aggr); - // FIXME - test_agg_quantile_tdigest_weighted(file, |name, params, columns, rows, _sort_descs| { - let block_entries = columns.to_vec(); - eval_aggr_for_test(name, params, &block_entries, rows, false, false, vec![]) - }); + test_agg_quantile_tdigest_weighted(file, eval_aggr); test_agg_median(file, eval_aggr); test_agg_median_tdigest(file, eval_aggr); test_agg_array_agg(file, eval_aggr); diff --git a/src/query/functions/tests/it/aggregates/mod.rs b/src/query/functions/tests/it/aggregates/mod.rs index e4477467926c9..3e2dfc7d8dcba 100644 --- a/src/query/functions/tests/it/aggregates/mod.rs +++ b/src/query/functions/tests/it/aggregates/mod.rs @@ -16,6 +16,7 @@ mod agg; mod agg_hashtable; +mod quantile_tdigest; use std::io::Write; diff --git a/src/query/functions/tests/it/aggregates/quantile_tdigest.rs b/src/query/functions/tests/it/aggregates/quantile_tdigest.rs new file mode 100644 index 0000000000000..49b3bc66f595b --- /dev/null +++ b/src/query/functions/tests/it/aggregates/quantile_tdigest.rs @@ -0,0 +1,518 @@ +// Copyright 2026 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io::Write; + +use bumpalo::Bump; +use databend_common_base::runtime::drop_guard; +use databend_common_exception::Result; +use databend_common_expression::AggrState; +use databend_common_expression::AggregateFunctionRef; +use databend_common_expression::BlockEntry; +use databend_common_expression::Column; +use databend_common_expression::ColumnBuilder; +use databend_common_expression::FromData; +use databend_common_expression::StateAddr; +use databend_common_expression::get_states_layout; +use databend_common_expression::types::DataType; +use databend_common_expression::types::Float64Type; +use databend_common_expression::types::Int64Type; +use databend_common_expression::types::number::UInt64Type; +use databend_common_functions::aggregates::AggregateFunctionFactory; +use databend_common_functions::aggregates::AggregateFunctionSortDesc; +use goldenfile::Mint; + +use super::eval_aggr_for_test; +use super::run_agg_ast; + +struct StateDropGuard { + func: AggregateFunctionRef, + loc: Box<[databend_common_expression::AggrStateLoc]>, + addrs: Vec, +} + +impl Drop for StateDropGuard { + fn drop(&mut self) { + drop_guard(|| { + if self.func.need_manual_drop_state() { + for addr in &self.addrs { + unsafe { + self.func.drop_state(AggrState::new(*addr, &self.loc)); + } + } + } + }); + } +} + +fn simulate_accumulate_matches_rows( + name: &str, + params: Vec, + entries: &[BlockEntry], + rows: usize, + sort_descs: Vec, +) -> Result<(Column, DataType)> { + let factory = AggregateFunctionFactory::instance(); + let arguments = entries.iter().map(BlockEntry::data_type).collect(); + let func = factory.get(name, params.clone(), arguments, sort_descs.clone())?; + let data_type = func.return_type()?; + let states_layout = get_states_layout(&[func.clone()])?; + let loc = states_layout.states_loc[0].clone(); + + let arena = Bump::new(); + let batch_addr: StateAddr = arena.alloc_layout(states_layout.layout).into(); + let rows_addr: StateAddr = arena.alloc_layout(states_layout.layout).into(); + let batch_state = AggrState::new(batch_addr, &loc); + let rows_state = AggrState::new(rows_addr, &loc); + func.init_state(batch_state); + func.init_state(rows_state); + let _drop_guard = StateDropGuard { + func: func.clone(), + loc: loc.clone(), + addrs: vec![batch_addr, rows_addr], + }; + + func.accumulate(batch_state, entries.into(), None, rows)?; + for row in 0..rows { + func.accumulate_row(rows_state, entries.into(), row)?; + } + + let mut batch_builder = ColumnBuilder::with_capacity(&data_type, 1); + func.merge_result(batch_state, false, &mut batch_builder)?; + let batch_column = batch_builder.build(); + + let mut rows_builder = ColumnBuilder::with_capacity(&data_type, 1); + func.merge_result(rows_state, false, &mut rows_builder)?; + let rows_column = rows_builder.build(); + + assert_eq!(batch_column, rows_column); + + let batch_roundtrip = eval_aggr_for_test( + name, + params.clone(), + entries, + rows, + false, + true, + sort_descs.clone(), + )?; + let rows_roundtrip = eval_aggr_for_test(name, params, entries, rows, true, true, sort_descs)?; + assert_eq!(batch_roundtrip.0, batch_column); + assert_eq!(rows_roundtrip.0, rows_column); + + Ok((batch_column, data_type)) +} + +#[test] +fn test_quantile_tdigest_edge_cases() { + let mut mint = Mint::new("tests/it/aggregates/testdata"); + let file = &mut mint.new_goldenfile("quantile_tdigest.txt").unwrap(); + + test_tdigest_empty_input(file); + test_tdigest_singleton_input(file); + test_tdigest_min_max_endpoints(file); + test_tdigest_weighted_interior_interpolation(file); + test_tdigest_singleton_boundaries(file); + test_tdigest_weighted_merged_centroid(file); + test_tdigest_weighted_zero_weight(file); + test_tdigest_weighted_tail_boundary(file); + test_tdigest_merge_empty_right(file); + test_tdigest_weighted_nan_input(file); + test_tdigest_weighted_zero_weight_nan(file); + test_tdigest_weighted_merge_nan_only_right(file); + test_tdigest_nan_input(file); + test_tdigest_merge_nan_only_right(file); + test_tdigest_merge_with_uncompressed_left(file); + test_tdigest_group_by_nan_input(file); + test_tdigest_weighted_group_by_nan_input(file); +} + +fn test_tdigest_empty_input(file: &mut impl Write) { + let columns = [("v", Int64Type::from_data(Vec::::new()).into())]; + run_agg_ast( + file, + "quantile_tdigest(0.5)(v)", + &columns, + simulate_accumulate_matches_rows, + vec![], + ); +} + +fn test_tdigest_singleton_input(file: &mut impl Write) { + let columns = [("v", Int64Type::from_data(vec![42_i64]).into())]; + run_agg_ast( + file, + "quantile_tdigest(0.5)(v)", + &columns, + simulate_accumulate_matches_rows, + vec![], + ); +} + +fn test_tdigest_min_max_endpoints(file: &mut impl Write) { + let positive_values = [("v", Int64Type::from_data(vec![1_i64, 2, 3]).into())]; + run_agg_ast( + file, + "quantile_tdigest(0, 1)(v)", + &positive_values, + simulate_accumulate_matches_rows, + vec![], + ); + + let negative_values = [("v", Int64Type::from_data(vec![-3_i64, -2, -1]).into())]; + run_agg_ast( + file, + "quantile_tdigest(0, 1)(v)", + &negative_values, + simulate_accumulate_matches_rows, + vec![], + ); +} + +fn test_tdigest_weighted_interior_interpolation(file: &mut impl Write) { + let columns = [ + ("v", Int64Type::from_data(vec![0_i64, 10]).into()), + ("w", UInt64Type::from_data(vec![2_u64, 2]).into()), + ]; + run_agg_ast( + file, + "quantile_tdigest_weighted(0.5)(v, w)", + &columns, + simulate_accumulate_matches_rows, + vec![], + ); +} + +fn test_tdigest_singleton_boundaries(file: &mut impl Write) { + let columns = [("v", Int64Type::from_data(vec![0_i64, 10, 20, 30]).into())]; + run_agg_ast( + file, + "quantile_tdigest(0.4, 0.6)(v)", + &columns, + simulate_accumulate_matches_rows, + vec![], + ); +} + +fn test_tdigest_weighted_merged_centroid(file: &mut impl Write) { + let columns = [ + ( + "v", + Float64Type::from_data(vec![0.0_f64, 50.0, 50.1, 100.0]).into(), + ), + ("w", UInt64Type::from_data(vec![499_u64, 1, 1, 499]).into()), + ]; + run_agg_ast( + file, + "quantile_tdigest_weighted(0.5)(v, w)", + &columns, + simulate_accumulate_matches_rows, + vec![], + ); +} + +fn test_tdigest_weighted_zero_weight(file: &mut impl Write) { + let columns = [ + ("v", Int64Type::from_data(vec![0_i64, 10]).into()), + ("w", UInt64Type::from_data(vec![0_u64, 1]).into()), + ]; + run_agg_ast( + file, + "quantile_tdigest_weighted(0)(v, w)", + &columns, + simulate_accumulate_matches_rows, + vec![], + ); +} + +fn test_tdigest_weighted_tail_boundary(file: &mut impl Write) { + let columns = [ + ("v", Int64Type::from_data(vec![0_i64, 1, 2]).into()), + ("w", UInt64Type::from_data(vec![1_u64, 1, 2]).into()), + ]; + run_agg_ast( + file, + "quantile_tdigest_weighted(0.75)(v, w)", + &columns, + simulate_accumulate_matches_rows, + vec![], + ); +} + +fn test_tdigest_merge_empty_right(file: &mut impl Write) { + let columns = [("v", Int64Type::from_data(vec![1_i64, 2, 3]).into())]; + run_agg_ast( + file, + "quantile_tdigest(0.5)(v)", + &columns, + simulate_merge_empty_right, + vec![], + ); +} + +fn test_tdigest_weighted_nan_input(file: &mut impl Write) { + let columns = [ + ( + "v", + Float64Type::from_data(vec![1.0_f64, f64::NAN, 2.0]).into(), + ), + ("w", UInt64Type::from_data(vec![1_u64, 1, 1]).into()), + ]; + run_agg_ast( + file, + "quantile_tdigest_weighted(0.5)(v, w)", + &columns, + simulate_accumulate_matches_rows, + vec![], + ); +} + +fn test_tdigest_weighted_zero_weight_nan(file: &mut impl Write) { + let columns = [ + ("v", Float64Type::from_data(vec![f64::NAN, 10.0_f64]).into()), + ("w", UInt64Type::from_data(vec![0_u64, 1]).into()), + ]; + run_agg_ast( + file, + "quantile_tdigest_weighted(0)(v, w)", + &columns, + simulate_accumulate_matches_rows, + vec![], + ); +} + +fn test_tdigest_weighted_merge_nan_only_right(file: &mut impl Write) { + let columns = [ + ("v", Float64Type::from_data(vec![1.0_f64, f64::NAN]).into()), + ("w", UInt64Type::from_data(vec![1_u64, 1]).into()), + ]; + run_agg_ast( + file, + "quantile_tdigest_weighted(0.5)(v, w)", + &columns, + simulate_merge_last_row_into_left, + vec![], + ); +} + +fn test_tdigest_nan_input(file: &mut impl Write) { + let columns = [( + "v", + Float64Type::from_data(vec![1.0_f64, f64::NAN, 2.0]).into(), + )]; + run_agg_ast( + file, + "quantile_tdigest(0.5)(v)", + &columns, + simulate_accumulate_matches_rows, + vec![], + ); +} + +fn test_tdigest_merge_nan_only_right(file: &mut impl Write) { + let columns = [("v", Float64Type::from_data(vec![1.0_f64, f64::NAN]).into())]; + run_agg_ast( + file, + "quantile_tdigest(0.5)(v)", + &columns, + simulate_merge_last_row_into_left, + vec![], + ); +} + +fn test_tdigest_merge_with_uncompressed_left(file: &mut impl Write) { + let columns = [ + ("v", Int64Type::from_data(vec![0_i64, 100, 50]).into()), + ("w", UInt64Type::from_data(vec![1_u64, 1, 10]).into()), + ]; + run_agg_ast( + file, + "quantile_tdigest_weighted(0.5)(v, w)", + &columns, + simulate_merge_into_uncompressed_left, + vec![], + ); +} + +fn test_tdigest_group_by_nan_input(file: &mut impl Write) { + let columns = [( + "v", + Float64Type::from_data(vec![1.0_f64, 10.0, f64::NAN, 20.0]).into(), + )]; + run_agg_ast( + file, + "quantile_tdigest(0.5)(v)", + &columns, + simulate_accumulate_keys_matches_rows, + vec![], + ); +} + +fn test_tdigest_weighted_group_by_nan_input(file: &mut impl Write) { + let columns = [ + ( + "v", + Float64Type::from_data(vec![1.0_f64, 10.0, f64::NAN, 20.0]).into(), + ), + ("w", UInt64Type::from_data(vec![1_u64, 1, 1, 1]).into()), + ]; + run_agg_ast( + file, + "quantile_tdigest_weighted(0.5)(v, w)", + &columns, + simulate_accumulate_keys_matches_rows, + vec![], + ); +} + +fn simulate_accumulate_keys_matches_rows( + name: &str, + params: Vec, + entries: &[BlockEntry], + rows: usize, + sort_descs: Vec, +) -> Result<(Column, DataType)> { + let factory = AggregateFunctionFactory::instance(); + let arguments = entries.iter().map(BlockEntry::data_type).collect(); + let func = factory.get(name, params, arguments, sort_descs)?; + let data_type = func.return_type()?; + let states_layout = get_states_layout(&[func.clone()])?; + let loc = states_layout.states_loc[0].clone(); + + let arena = Bump::new(); + let keys_left_addr: StateAddr = arena.alloc_layout(states_layout.layout).into(); + let keys_right_addr: StateAddr = arena.alloc_layout(states_layout.layout).into(); + let rows_left_addr: StateAddr = arena.alloc_layout(states_layout.layout).into(); + let rows_right_addr: StateAddr = arena.alloc_layout(states_layout.layout).into(); + + let keys_left = AggrState::new(keys_left_addr, &loc); + let keys_right = AggrState::new(keys_right_addr, &loc); + let rows_left = AggrState::new(rows_left_addr, &loc); + let rows_right = AggrState::new(rows_right_addr, &loc); + for state in [keys_left, keys_right, rows_left, rows_right] { + func.init_state(state); + } + let _drop_guard = StateDropGuard { + func: func.clone(), + loc: loc.clone(), + addrs: vec![ + keys_left_addr, + keys_right_addr, + rows_left_addr, + rows_right_addr, + ], + }; + + let places = (0..rows) + .map(|i| { + if i % 2 == 0 { + keys_left_addr + } else { + keys_right_addr + } + }) + .collect::>(); + func.accumulate_keys(&places, &loc, entries.into(), rows)?; + + for row in 0..rows { + let state = if row % 2 == 0 { rows_left } else { rows_right }; + func.accumulate_row(state, entries.into(), row)?; + } + + let mut keys_builder = ColumnBuilder::with_capacity(&data_type, 2); + func.merge_result(keys_left, false, &mut keys_builder)?; + func.merge_result(keys_right, false, &mut keys_builder)?; + let keys_column = keys_builder.build(); + + let mut rows_builder = ColumnBuilder::with_capacity(&data_type, 2); + func.merge_result(rows_left, false, &mut rows_builder)?; + func.merge_result(rows_right, false, &mut rows_builder)?; + let rows_column = rows_builder.build(); + + assert_eq!(keys_column, rows_column); + Ok((keys_column, data_type)) +} + +fn simulate_merge_last_row_into_left( + name: &str, + params: Vec, + entries: &[BlockEntry], + rows: usize, + sort_descs: Vec, +) -> Result<(Column, DataType)> { + assert!(rows > 1); + simulate_merge_split(name, params, entries, rows, sort_descs, rows - 1) +} + +fn simulate_merge_empty_right( + name: &str, + params: Vec, + entries: &[BlockEntry], + rows: usize, + sort_descs: Vec, +) -> Result<(Column, DataType)> { + simulate_merge_split(name, params, entries, rows, sort_descs, rows) +} + +fn simulate_merge_into_uncompressed_left( + name: &str, + params: Vec, + entries: &[BlockEntry], + rows: usize, + sort_descs: Vec, +) -> Result<(Column, DataType)> { + assert_eq!(rows, 3); + simulate_merge_split(name, params, entries, rows, sort_descs, 2) +} + +fn simulate_merge_split( + name: &str, + params: Vec, + entries: &[BlockEntry], + rows: usize, + sort_descs: Vec, + right_start: usize, +) -> Result<(Column, DataType)> { + let factory = AggregateFunctionFactory::instance(); + let arguments = entries.iter().map(BlockEntry::data_type).collect(); + let func = factory.get(name, params, arguments, sort_descs)?; + let data_type = func.return_type()?; + let states_layout = get_states_layout(&[func.clone()])?; + let loc = states_layout.states_loc[0].clone(); + + let arena = Bump::new(); + let left_addr: StateAddr = arena.alloc_layout(states_layout.layout).into(); + let right_addr: StateAddr = arena.alloc_layout(states_layout.layout).into(); + let left = AggrState::new(left_addr, &loc); + let right = AggrState::new(right_addr, &loc); + func.init_state(left); + func.init_state(right); + let _drop_guard = StateDropGuard { + func: func.clone(), + loc: loc.clone(), + addrs: vec![left_addr, right_addr], + }; + + for row in 0..right_start { + func.accumulate_row(left, entries.into(), row)?; + } + for row in right_start..rows { + func.accumulate_row(right, entries.into(), row)?; + } + func.merge_states(left, right)?; + + let mut builder = ColumnBuilder::with_capacity(&data_type, 1); + func.merge_result(left, false, &mut builder)?; + Ok((builder.build(), data_type)) +} diff --git a/src/query/functions/tests/it/aggregates/testdata/agg.txt b/src/query/functions/tests/it/aggregates/testdata/agg.txt index 01ff5f83813b5..0f8b2afdf7c4e 100644 --- a/src/query/functions/tests/it/aggregates/testdata/agg.txt +++ b/src/query/functions/tests/it/aggregates/testdata/agg.txt @@ -1091,7 +1091,7 @@ evaluation (internal): +--------+-----------------------------------------------------------------+ | a | Column(Int64([4, 3, 2, 1])) | | b | Column(UInt64([1, 2, 3, 4])) | -| Output | NullableColumn { column: Float64([4]), validity: [0b_______1] } | +| Output | NullableColumn { column: Float64([3]), validity: [0b_______1] } | +--------+-----------------------------------------------------------------+ diff --git a/src/query/functions/tests/it/aggregates/testdata/quantile_tdigest.txt b/src/query/functions/tests/it/aggregates/testdata/quantile_tdigest.txt new file mode 100644 index 0000000000000..36b5771d16184 --- /dev/null +++ b/src/query/functions/tests/it/aggregates/testdata/quantile_tdigest.txt @@ -0,0 +1,189 @@ +ast: quantile_tdigest(0.5)(v) +evaluation (internal): ++--------+-----------------------------------------------------------------+ +| Column | Data | ++--------+-----------------------------------------------------------------+ +| v | Column(Int64([])) | +| Output | NullableColumn { column: Float64([0]), validity: [0b_______0] } | ++--------+-----------------------------------------------------------------+ + + +ast: quantile_tdigest(0.5)(v) +evaluation (internal): ++--------+------------------------------------------------------------------+ +| Column | Data | ++--------+------------------------------------------------------------------+ +| v | Column(Int64([42])) | +| Output | NullableColumn { column: Float64([42]), validity: [0b_______1] } | ++--------+------------------------------------------------------------------+ + + +ast: quantile_tdigest(0, 1)(v) +evaluation (internal): ++--------+-------------------------------------------------------------------------------------------------------------+ +| Column | Data | ++--------+-------------------------------------------------------------------------------------------------------------+ +| v | Column(Int64([1, 2, 3])) | +| Output | NullableColumn { column: ArrayColumn { values: Float64([1, 3]), offsets: [0, 2] }, validity: [0b_______1] } | ++--------+-------------------------------------------------------------------------------------------------------------+ + + +ast: quantile_tdigest(0, 1)(v) +evaluation (internal): ++--------+---------------------------------------------------------------------------------------------------------------+ +| Column | Data | ++--------+---------------------------------------------------------------------------------------------------------------+ +| v | Column(Int64([-3, -2, -1])) | +| Output | NullableColumn { column: ArrayColumn { values: Float64([-3, -1]), offsets: [0, 2] }, validity: [0b_______1] } | ++--------+---------------------------------------------------------------------------------------------------------------+ + + +ast: quantile_tdigest_weighted(0.5)(v, w) +evaluation (internal): ++--------+-----------------------------------------------------------------+ +| Column | Data | ++--------+-----------------------------------------------------------------+ +| v | Column(Int64([0, 10])) | +| w | Column(UInt64([2, 2])) | +| Output | NullableColumn { column: Float64([5]), validity: [0b_______1] } | ++--------+-----------------------------------------------------------------+ + + +ast: quantile_tdigest(0.4, 0.6)(v) +evaluation (internal): ++--------+---------------------------------------------------------------------------------------------------------------+ +| Column | Data | ++--------+---------------------------------------------------------------------------------------------------------------+ +| v | Column(Int64([0, 10, 20, 30])) | +| Output | NullableColumn { column: ArrayColumn { values: Float64([10, 20]), offsets: [0, 2] }, validity: [0b_______1] } | ++--------+---------------------------------------------------------------------------------------------------------------+ + + +ast: quantile_tdigest_weighted(0.5)(v, w) +evaluation (internal): ++--------+---------------------------------------------------------------------+ +| Column | Data | ++--------+---------------------------------------------------------------------+ +| v | Column(Float64([0, 50, 50.1, 100])) | +| w | Column(UInt64([499, 1, 1, 499])) | +| Output | NullableColumn { column: Float64([50.05]), validity: [0b_______1] } | ++--------+---------------------------------------------------------------------+ + + +ast: quantile_tdigest_weighted(0)(v, w) +evaluation (internal): ++--------+------------------------------------------------------------------+ +| Column | Data | ++--------+------------------------------------------------------------------+ +| v | Column(Int64([0, 10])) | +| w | Column(UInt64([0, 1])) | +| Output | NullableColumn { column: Float64([10]), validity: [0b_______1] } | ++--------+------------------------------------------------------------------+ + + +ast: quantile_tdigest_weighted(0.75)(v, w) +evaluation (internal): ++--------+-----------------------------------------------------------------+ +| Column | Data | ++--------+-----------------------------------------------------------------+ +| v | Column(Int64([0, 1, 2])) | +| w | Column(UInt64([1, 1, 2])) | +| Output | NullableColumn { column: Float64([2]), validity: [0b_______1] } | ++--------+-----------------------------------------------------------------+ + + +ast: quantile_tdigest(0.5)(v) +evaluation (internal): ++--------+-----------------------------------------------------------------+ +| Column | Data | ++--------+-----------------------------------------------------------------+ +| v | Column(Int64([1, 2, 3])) | +| Output | NullableColumn { column: Float64([2]), validity: [0b_______1] } | ++--------+-----------------------------------------------------------------+ + + +ast: quantile_tdigest_weighted(0.5)(v, w) +evaluation (internal): ++--------+-------------------------------------------------------------------+ +| Column | Data | ++--------+-------------------------------------------------------------------+ +| v | Column(Float64([1, NaN, 2])) | +| w | Column(UInt64([1, 1, 1])) | +| Output | NullableColumn { column: Float64([NaN]), validity: [0b_______1] } | ++--------+-------------------------------------------------------------------+ + + +ast: quantile_tdigest_weighted(0)(v, w) +evaluation (internal): ++--------+------------------------------------------------------------------+ +| Column | Data | ++--------+------------------------------------------------------------------+ +| v | Column(Float64([NaN, 10])) | +| w | Column(UInt64([0, 1])) | +| Output | NullableColumn { column: Float64([10]), validity: [0b_______1] } | ++--------+------------------------------------------------------------------+ + + +ast: quantile_tdigest_weighted(0.5)(v, w) +evaluation (internal): ++--------+-------------------------------------------------------------------+ +| Column | Data | ++--------+-------------------------------------------------------------------+ +| v | Column(Float64([1, NaN])) | +| w | Column(UInt64([1, 1])) | +| Output | NullableColumn { column: Float64([NaN]), validity: [0b_______1] } | ++--------+-------------------------------------------------------------------+ + + +ast: quantile_tdigest(0.5)(v) +evaluation (internal): ++--------+-------------------------------------------------------------------+ +| Column | Data | ++--------+-------------------------------------------------------------------+ +| v | Column(Float64([1, NaN, 2])) | +| Output | NullableColumn { column: Float64([NaN]), validity: [0b_______1] } | ++--------+-------------------------------------------------------------------+ + + +ast: quantile_tdigest(0.5)(v) +evaluation (internal): ++--------+-------------------------------------------------------------------+ +| Column | Data | ++--------+-------------------------------------------------------------------+ +| v | Column(Float64([1, NaN])) | +| Output | NullableColumn { column: Float64([NaN]), validity: [0b_______1] } | ++--------+-------------------------------------------------------------------+ + + +ast: quantile_tdigest_weighted(0.5)(v, w) +evaluation (internal): ++--------+------------------------------------------------------------------+ +| Column | Data | ++--------+------------------------------------------------------------------+ +| v | Column(Int64([0, 100, 50])) | +| w | Column(UInt64([1, 1, 10])) | +| Output | NullableColumn { column: Float64([50]), validity: [0b_______1] } | ++--------+------------------------------------------------------------------+ + + +ast: quantile_tdigest(0.5)(v) +evaluation (internal): ++--------+-----------------------------------------------------------------------+ +| Column | Data | ++--------+-----------------------------------------------------------------------+ +| v | Column(Float64([1, 10, NaN, 20])) | +| Output | NullableColumn { column: Float64([NaN, 20]), validity: [0b______11] } | ++--------+-----------------------------------------------------------------------+ + + +ast: quantile_tdigest_weighted(0.5)(v, w) +evaluation (internal): ++--------+-----------------------------------------------------------------------+ +| Column | Data | ++--------+-----------------------------------------------------------------------+ +| v | Column(Float64([1, 10, NaN, 20])) | +| w | Column(UInt64([1, 1, 1, 1])) | +| Output | NullableColumn { column: Float64([NaN, 20]), validity: [0b______11] } | ++--------+-----------------------------------------------------------------------+ + +