From fe226dd055c47a3f8ec2085930ff5579b12bdc25 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 2 Apr 2026 16:17:06 +0800 Subject: [PATCH 1/6] Update min_max.rs to support dictionary scalars Return ScalarValue::Dictionary(...) in dictionary batches instead of unwrapping to inner scalars. Enhance min_max! logic to safely handle dictionary-vs-dictionary and dictionary-vs-non-dictionary comparisons. Add regression tests for raw-dictionary covering no-coercion, null-containing, and multi-batch scenarios. --- .../functions-aggregate-common/src/min_max.rs | 36 +++++-- datafusion/functions-aggregate/src/min_max.rs | 95 +++++++++++++++++++ 2 files changed, 125 insertions(+), 6 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/min_max.rs b/datafusion/functions-aggregate-common/src/min_max.rs index 27620221cf23c..df950323d80d9 100644 --- a/datafusion/functions-aggregate-common/src/min_max.rs +++ b/datafusion/functions-aggregate-common/src/min_max.rs @@ -413,6 +413,28 @@ macro_rules! min_max { min_max_generic!(lhs, rhs, $OP) } + ( + ScalarValue::Dictionary(key_type, lhs_inner), + ScalarValue::Dictionary(_, rhs_inner), + ) => { + let winner = min_max_generic!(lhs_inner.as_ref(), rhs_inner.as_ref(), $OP); + ScalarValue::Dictionary(key_type.clone(), Box::new(winner)) + } + + ( + ScalarValue::Dictionary(_, lhs_inner), + rhs, + ) => { + min_max_generic!(lhs_inner.as_ref(), rhs, $OP) + } + + ( + lhs, + ScalarValue::Dictionary(_, rhs_inner), + ) => { + min_max_generic!(lhs, rhs_inner.as_ref(), $OP) + } + e => { return internal_err!( "MIN/MAX is not expected to receive scalars of incompatible types {:?}", @@ -766,9 +788,10 @@ pub fn min_batch(values: &ArrayRef) -> Result { DataType::FixedSizeList(_, _) => { min_max_batch_generic(values, Ordering::Greater)? } - DataType::Dictionary(_, _) => { - let values = values.as_any_dictionary().values(); - min_batch(values)? + DataType::Dictionary(key_type, _) => { + let dict_values = values.as_any_dictionary().values(); + let inner = min_batch(dict_values)?; + ScalarValue::Dictionary(key_type.clone(), Box::new(inner)) } _ => min_max_batch!(values, min), }) @@ -847,9 +870,10 @@ pub fn max_batch(values: &ArrayRef) -> Result { DataType::List(_) => min_max_batch_generic(values, Ordering::Less)?, DataType::LargeList(_) => min_max_batch_generic(values, Ordering::Less)?, DataType::FixedSizeList(_, _) => min_max_batch_generic(values, Ordering::Less)?, - DataType::Dictionary(_, _) => { - let values = values.as_any_dictionary().values(); - max_batch(values)? + DataType::Dictionary(key_type, _) => { + let dict_values = values.as_any_dictionary().values(); + let inner = max_batch(dict_values)?; + ScalarValue::Dictionary(key_type.clone(), Box::new(inner)) } _ => min_max_batch!(values, max), }) diff --git a/datafusion/functions-aggregate/src/min_max.rs b/datafusion/functions-aggregate/src/min_max.rs index 9d05c57b02e93..d7aa9bfd1b5e2 100644 --- a/datafusion/functions-aggregate/src/min_max.rs +++ b/datafusion/functions-aggregate/src/min_max.rs @@ -1262,4 +1262,99 @@ mod tests { assert_eq!(max_result, ScalarValue::Utf8(Some("🦀".to_string()))); Ok(()) } + + fn dict_scalar(key_type: DataType, inner: ScalarValue) -> ScalarValue { + ScalarValue::Dictionary(Box::new(key_type), Box::new(inner)) + } + + #[test] + fn test_min_max_dictionary_without_coercion() -> Result<()> { + let values = StringArray::from(vec!["b", "c", "a", "d"]); + let keys = Int32Array::from(vec![Some(0), Some(1), Some(2), Some(3)]); + let dict_array = + DictionaryArray::try_new(keys, Arc::new(values) as ArrayRef).unwrap(); + let dict_array_ref = Arc::new(dict_array) as ArrayRef; + let dict_type = dict_array_ref.data_type().clone(); + + let mut min_acc = MinAccumulator::try_new(&dict_type)?; + min_acc.update_batch(&[Arc::clone(&dict_array_ref)])?; + let min_result = min_acc.evaluate()?; + assert_eq!( + min_result, + dict_scalar(DataType::Int32, ScalarValue::Utf8(Some("a".to_string()))) + ); + + let mut max_acc = MaxAccumulator::try_new(&dict_type)?; + max_acc.update_batch(&[Arc::clone(&dict_array_ref)])?; + let max_result = max_acc.evaluate()?; + assert_eq!( + max_result, + dict_scalar(DataType::Int32, ScalarValue::Utf8(Some("d".to_string()))) + ); + Ok(()) + } + + #[test] + fn test_min_max_dictionary_with_nulls() -> Result<()> { + let values = StringArray::from(vec!["b", "c", "a"]); + let keys = Int32Array::from(vec![None, Some(0), None, Some(1), Some(2)]); + let dict_array = + DictionaryArray::try_new(keys, Arc::new(values) as ArrayRef).unwrap(); + let dict_array_ref = Arc::new(dict_array) as ArrayRef; + let dict_type = dict_array_ref.data_type().clone(); + + let mut min_acc = MinAccumulator::try_new(&dict_type)?; + min_acc.update_batch(&[Arc::clone(&dict_array_ref)])?; + let min_result = min_acc.evaluate()?; + assert_eq!( + min_result, + dict_scalar(DataType::Int32, ScalarValue::Utf8(Some("a".to_string()))) + ); + + let mut max_acc = MaxAccumulator::try_new(&dict_type)?; + max_acc.update_batch(&[Arc::clone(&dict_array_ref)])?; + let max_result = max_acc.evaluate()?; + assert_eq!( + max_result, + dict_scalar(DataType::Int32, ScalarValue::Utf8(Some("c".to_string()))) + ); + Ok(()) + } + + #[test] + fn test_min_max_dictionary_multi_batch() -> Result<()> { + let dict_type = + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); + + let values1 = StringArray::from(vec!["b", "c"]); + let keys1 = Int32Array::from(vec![Some(0), Some(1)]); + let batch1 = Arc::new( + DictionaryArray::try_new(keys1, Arc::new(values1) as ArrayRef).unwrap(), + ) as ArrayRef; + + let values2 = StringArray::from(vec!["a", "d"]); + let keys2 = Int32Array::from(vec![Some(0), Some(1)]); + let batch2 = Arc::new( + DictionaryArray::try_new(keys2, Arc::new(values2) as ArrayRef).unwrap(), + ) as ArrayRef; + + let mut min_acc = MinAccumulator::try_new(&dict_type)?; + min_acc.update_batch(&[Arc::clone(&batch1)])?; + min_acc.update_batch(&[Arc::clone(&batch2)])?; + let min_result = min_acc.evaluate()?; + assert_eq!( + min_result, + dict_scalar(DataType::Int32, ScalarValue::Utf8(Some("a".to_string()))) + ); + + let mut max_acc = MaxAccumulator::try_new(&dict_type)?; + max_acc.update_batch(&[Arc::clone(&batch1)])?; + max_acc.update_batch(&[Arc::clone(&batch2)])?; + let max_result = max_acc.evaluate()?; + assert_eq!( + max_result, + dict_scalar(DataType::Int32, ScalarValue::Utf8(Some("d".to_string()))) + ); + Ok(()) + } } From caafe1ce6aee4467f929adbbaf5d6e5718567f07 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 2 Apr 2026 16:24:31 +0800 Subject: [PATCH 2/6] Refactor dictionary min/max logic and tests Centralize dictionary batch handling for min/max operations. Streamline min_max_batch_generic to initialize from the first non-null element. Implement shared setup/assert helpers in dictionary tests to reduce repetition while preserving test coverage. --- .../functions-aggregate-common/src/min_max.rs | 65 +++++---- datafusion/functions-aggregate/src/min_max.rs | 125 ++++++++---------- 2 files changed, 93 insertions(+), 97 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/min_max.rs b/datafusion/functions-aggregate-common/src/min_max.rs index df950323d80d9..4d984fc4b0c78 100644 --- a/datafusion/functions-aggregate-common/src/min_max.rs +++ b/datafusion/functions-aggregate-common/src/min_max.rs @@ -141,6 +141,16 @@ macro_rules! min_max_generic { }}; } +macro_rules! min_max_dictionary { + ($VALUE:expr, $DELTA:expr, wrap $KEY_TYPE:expr, $OP:ident) => {{ + let winner = min_max_generic!($VALUE, $DELTA, $OP); + ScalarValue::Dictionary($KEY_TYPE.clone(), Box::new(winner)) + }}; + ($VALUE:expr, $DELTA:expr, $OP:ident) => {{ + min_max_generic!($VALUE, $DELTA, $OP) + }}; +} + // min/max of two scalar values of the same type macro_rules! min_max { ($VALUE:expr, $DELTA:expr, $OP:ident) => {{ @@ -417,22 +427,26 @@ macro_rules! min_max { ScalarValue::Dictionary(key_type, lhs_inner), ScalarValue::Dictionary(_, rhs_inner), ) => { - let winner = min_max_generic!(lhs_inner.as_ref(), rhs_inner.as_ref(), $OP); - ScalarValue::Dictionary(key_type.clone(), Box::new(winner)) + min_max_dictionary!( + lhs_inner.as_ref(), + rhs_inner.as_ref(), + wrap key_type, + $OP + ) } ( ScalarValue::Dictionary(_, lhs_inner), rhs, ) => { - min_max_generic!(lhs_inner.as_ref(), rhs, $OP) + min_max_dictionary!(lhs_inner.as_ref(), rhs, $OP) } ( lhs, ScalarValue::Dictionary(_, rhs_inner), ) => { - min_max_generic!(lhs, rhs_inner.as_ref(), $OP) + min_max_dictionary!(lhs, rhs_inner.as_ref(), $OP) } e => { @@ -445,6 +459,17 @@ macro_rules! min_max { }}; } +fn dictionary_batch_extreme( + values: &ArrayRef, + extreme_fn: fn(&ArrayRef) -> Result, +) -> Result { + let DataType::Dictionary(key_type, _) = values.data_type() else { + unreachable!("dictionary_batch_extreme requires dictionary arrays") + }; + let inner = extreme_fn(values.as_any_dictionary().values())?; + Ok(ScalarValue::Dictionary(key_type.clone(), Box::new(inner))) +} + /// An accumulator to compute the maximum value #[derive(Debug, Clone)] pub struct MaxAccumulator { @@ -788,32 +813,22 @@ pub fn min_batch(values: &ArrayRef) -> Result { DataType::FixedSizeList(_, _) => { min_max_batch_generic(values, Ordering::Greater)? } - DataType::Dictionary(key_type, _) => { - let dict_values = values.as_any_dictionary().values(); - let inner = min_batch(dict_values)?; - ScalarValue::Dictionary(key_type.clone(), Box::new(inner)) - } + DataType::Dictionary(_, _) => dictionary_batch_extreme(values, min_batch)?, _ => min_max_batch!(values, min), }) } /// Generic min/max implementation for complex types fn min_max_batch_generic(array: &ArrayRef, ordering: Ordering) -> Result { - if array.len() == array.null_count() { + let mut non_null_indices = (0..array.len()).filter(|&i| !array.is_null(i)); + let Some(first_idx) = non_null_indices.next() else { return ScalarValue::try_from(array.data_type()); - } - let mut extreme = ScalarValue::try_from_array(array, 0)?; - for i in 1..array.len() { + }; + + let mut extreme = ScalarValue::try_from_array(array, first_idx)?; + for i in non_null_indices { let current = ScalarValue::try_from_array(array, i)?; - if current.is_null() { - continue; - } - if extreme.is_null() { - extreme = current; - continue; - } - let cmp = extreme.try_cmp(¤t)?; - if cmp == ordering { + if extreme.try_cmp(¤t)? == ordering { extreme = current; } } @@ -870,11 +885,7 @@ pub fn max_batch(values: &ArrayRef) -> Result { DataType::List(_) => min_max_batch_generic(values, Ordering::Less)?, DataType::LargeList(_) => min_max_batch_generic(values, Ordering::Less)?, DataType::FixedSizeList(_, _) => min_max_batch_generic(values, Ordering::Less)?, - DataType::Dictionary(key_type, _) => { - let dict_values = values.as_any_dictionary().values(); - let inner = max_batch(dict_values)?; - ScalarValue::Dictionary(key_type.clone(), Box::new(inner)) - } + DataType::Dictionary(_, _) => dictionary_batch_extreme(values, max_batch)?, _ => min_max_batch!(values, max), }) } diff --git a/datafusion/functions-aggregate/src/min_max.rs b/datafusion/functions-aggregate/src/min_max.rs index d7aa9bfd1b5e2..5734f2854dd8d 100644 --- a/datafusion/functions-aggregate/src/min_max.rs +++ b/datafusion/functions-aggregate/src/min_max.rs @@ -1267,94 +1267,79 @@ mod tests { ScalarValue::Dictionary(Box::new(key_type), Box::new(inner)) } - #[test] - fn test_min_max_dictionary_without_coercion() -> Result<()> { - let values = StringArray::from(vec!["b", "c", "a", "d"]); - let keys = Int32Array::from(vec![Some(0), Some(1), Some(2), Some(3)]); - let dict_array = - DictionaryArray::try_new(keys, Arc::new(values) as ArrayRef).unwrap(); - let dict_array_ref = Arc::new(dict_array) as ArrayRef; - let dict_type = dict_array_ref.data_type().clone(); + fn utf8_dict_scalar(key_type: DataType, value: &str) -> ScalarValue { + dict_scalar(key_type, ScalarValue::Utf8(Some(value.to_string()))) + } + + fn string_dictionary_batch( + values: Vec<&str>, + keys: Vec>, + ) -> ArrayRef { + let values = Arc::new(StringArray::from(values)) as ArrayRef; + Arc::new(DictionaryArray::try_new(Int32Array::from(keys), values).unwrap()) + as ArrayRef + } + + fn assert_dictionary_min_max( + dict_type: &DataType, + batches: &[ArrayRef], + expected_min: &str, + expected_max: &str, + ) -> Result<()> { + let key_type = match dict_type { + DataType::Dictionary(key_type, _) => key_type.as_ref().clone(), + other => panic!("expected dictionary type, got {other:?}"), + }; - let mut min_acc = MinAccumulator::try_new(&dict_type)?; - min_acc.update_batch(&[Arc::clone(&dict_array_ref)])?; - let min_result = min_acc.evaluate()?; + let mut min_acc = MinAccumulator::try_new(dict_type)?; + for batch in batches { + min_acc.update_batch(&[Arc::clone(batch)])?; + } assert_eq!( - min_result, - dict_scalar(DataType::Int32, ScalarValue::Utf8(Some("a".to_string()))) + min_acc.evaluate()?, + utf8_dict_scalar(key_type.clone(), expected_min) ); - let mut max_acc = MaxAccumulator::try_new(&dict_type)?; - max_acc.update_batch(&[Arc::clone(&dict_array_ref)])?; - let max_result = max_acc.evaluate()?; - assert_eq!( - max_result, - dict_scalar(DataType::Int32, ScalarValue::Utf8(Some("d".to_string()))) - ); + let mut max_acc = MaxAccumulator::try_new(dict_type)?; + for batch in batches { + max_acc.update_batch(&[Arc::clone(batch)])?; + } + assert_eq!(max_acc.evaluate()?, utf8_dict_scalar(key_type, expected_max)); + Ok(()) } #[test] - fn test_min_max_dictionary_with_nulls() -> Result<()> { - let values = StringArray::from(vec!["b", "c", "a"]); - let keys = Int32Array::from(vec![None, Some(0), None, Some(1), Some(2)]); - let dict_array = - DictionaryArray::try_new(keys, Arc::new(values) as ArrayRef).unwrap(); - let dict_array_ref = Arc::new(dict_array) as ArrayRef; + fn test_min_max_dictionary_without_coercion() -> Result<()> { + let dict_array_ref = string_dictionary_batch( + vec!["b", "c", "a", "d"], + vec![Some(0), Some(1), Some(2), Some(3)], + ); let dict_type = dict_array_ref.data_type().clone(); - let mut min_acc = MinAccumulator::try_new(&dict_type)?; - min_acc.update_batch(&[Arc::clone(&dict_array_ref)])?; - let min_result = min_acc.evaluate()?; - assert_eq!( - min_result, - dict_scalar(DataType::Int32, ScalarValue::Utf8(Some("a".to_string()))) - ); + assert_dictionary_min_max(&dict_type, &[dict_array_ref], "a", "d") + } - let mut max_acc = MaxAccumulator::try_new(&dict_type)?; - max_acc.update_batch(&[Arc::clone(&dict_array_ref)])?; - let max_result = max_acc.evaluate()?; - assert_eq!( - max_result, - dict_scalar(DataType::Int32, ScalarValue::Utf8(Some("c".to_string()))) + #[test] + fn test_min_max_dictionary_with_nulls() -> Result<()> { + let dict_array_ref = string_dictionary_batch( + vec!["b", "c", "a"], + vec![None, Some(0), None, Some(1), Some(2)], ); - Ok(()) + let dict_type = dict_array_ref.data_type().clone(); + + assert_dictionary_min_max(&dict_type, &[dict_array_ref], "a", "c") } #[test] fn test_min_max_dictionary_multi_batch() -> Result<()> { let dict_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); + let batch1 = + string_dictionary_batch(vec!["b", "c"], vec![Some(0), Some(1)]); + let batch2 = + string_dictionary_batch(vec!["a", "d"], vec![Some(0), Some(1)]); - let values1 = StringArray::from(vec!["b", "c"]); - let keys1 = Int32Array::from(vec![Some(0), Some(1)]); - let batch1 = Arc::new( - DictionaryArray::try_new(keys1, Arc::new(values1) as ArrayRef).unwrap(), - ) as ArrayRef; - - let values2 = StringArray::from(vec!["a", "d"]); - let keys2 = Int32Array::from(vec![Some(0), Some(1)]); - let batch2 = Arc::new( - DictionaryArray::try_new(keys2, Arc::new(values2) as ArrayRef).unwrap(), - ) as ArrayRef; - - let mut min_acc = MinAccumulator::try_new(&dict_type)?; - min_acc.update_batch(&[Arc::clone(&batch1)])?; - min_acc.update_batch(&[Arc::clone(&batch2)])?; - let min_result = min_acc.evaluate()?; - assert_eq!( - min_result, - dict_scalar(DataType::Int32, ScalarValue::Utf8(Some("a".to_string()))) - ); - - let mut max_acc = MaxAccumulator::try_new(&dict_type)?; - max_acc.update_batch(&[Arc::clone(&batch1)])?; - max_acc.update_batch(&[Arc::clone(&batch2)])?; - let max_result = max_acc.evaluate()?; - assert_eq!( - max_result, - dict_scalar(DataType::Int32, ScalarValue::Utf8(Some("d".to_string()))) - ); - Ok(()) + assert_dictionary_min_max(&dict_type, &[batch1, batch2], "a", "d") } } From 0bbc56e23a68f7ed7500934a939b708f4cbb644e Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 2 Apr 2026 16:30:44 +0800 Subject: [PATCH 3/6] Simplify min/max flow in dictionary handling Refactor dictionary min/max flow by removing the wrap macro arm, making re-wrapping explicit through a private helper. This separates the "choose inner winner" from the "wrap as dictionary" step for easier auditing. In `datafusion/functions-aggregate/src/min_max.rs`, update `string_dictionary_batch` to accept slices instead of owned Vecs, and introduce a small `evaluate_dictionary_accumulator` helper to streamline min/max assertions with a shared accumulator execution path, reducing repeated setup. --- .../functions-aggregate-common/src/min_max.rs | 20 +++---- datafusion/functions-aggregate/src/min_max.rs | 59 ++++++++++--------- 2 files changed, 41 insertions(+), 38 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/min_max.rs b/datafusion/functions-aggregate-common/src/min_max.rs index 4d984fc4b0c78..aa802d4003f4d 100644 --- a/datafusion/functions-aggregate-common/src/min_max.rs +++ b/datafusion/functions-aggregate-common/src/min_max.rs @@ -142,13 +142,7 @@ macro_rules! min_max_generic { } macro_rules! min_max_dictionary { - ($VALUE:expr, $DELTA:expr, wrap $KEY_TYPE:expr, $OP:ident) => {{ - let winner = min_max_generic!($VALUE, $DELTA, $OP); - ScalarValue::Dictionary($KEY_TYPE.clone(), Box::new(winner)) - }}; - ($VALUE:expr, $DELTA:expr, $OP:ident) => {{ - min_max_generic!($VALUE, $DELTA, $OP) - }}; + ($VALUE:expr, $DELTA:expr, $OP:ident) => {{ min_max_generic!($VALUE, $DELTA, $OP) }}; } // min/max of two scalar values of the same type @@ -427,11 +421,13 @@ macro_rules! min_max { ScalarValue::Dictionary(key_type, lhs_inner), ScalarValue::Dictionary(_, rhs_inner), ) => { - min_max_dictionary!( + wrap_dictionary_scalar( + key_type.as_ref(), + min_max_dictionary!( lhs_inner.as_ref(), rhs_inner.as_ref(), - wrap key_type, $OP + ), ) } @@ -467,7 +463,11 @@ fn dictionary_batch_extreme( unreachable!("dictionary_batch_extreme requires dictionary arrays") }; let inner = extreme_fn(values.as_any_dictionary().values())?; - Ok(ScalarValue::Dictionary(key_type.clone(), Box::new(inner))) + Ok(wrap_dictionary_scalar(key_type.as_ref(), inner)) +} + +fn wrap_dictionary_scalar(key_type: &DataType, value: ScalarValue) -> ScalarValue { + ScalarValue::Dictionary(Box::new(key_type.clone()), Box::new(value)) } /// An accumulator to compute the maximum value diff --git a/datafusion/functions-aggregate/src/min_max.rs b/datafusion/functions-aggregate/src/min_max.rs index 5734f2854dd8d..6be7341ed10cc 100644 --- a/datafusion/functions-aggregate/src/min_max.rs +++ b/datafusion/functions-aggregate/src/min_max.rs @@ -1271,13 +1271,21 @@ mod tests { dict_scalar(key_type, ScalarValue::Utf8(Some(value.to_string()))) } - fn string_dictionary_batch( - values: Vec<&str>, - keys: Vec>, - ) -> ArrayRef { - let values = Arc::new(StringArray::from(values)) as ArrayRef; - Arc::new(DictionaryArray::try_new(Int32Array::from(keys), values).unwrap()) - as ArrayRef + fn string_dictionary_batch(values: &[&str], keys: &[Option]) -> ArrayRef { + let values = Arc::new(StringArray::from(values.to_vec())) as ArrayRef; + Arc::new( + DictionaryArray::try_new(Int32Array::from(keys.to_vec()), values).unwrap(), + ) as ArrayRef + } + + fn evaluate_dictionary_accumulator( + mut acc: impl Accumulator, + batches: &[ArrayRef], + ) -> Result { + for batch in batches { + acc.update_batch(&[Arc::clone(batch)])?; + } + acc.evaluate() } fn assert_dictionary_min_max( @@ -1291,20 +1299,17 @@ mod tests { other => panic!("expected dictionary type, got {other:?}"), }; - let mut min_acc = MinAccumulator::try_new(dict_type)?; - for batch in batches { - min_acc.update_batch(&[Arc::clone(batch)])?; - } - assert_eq!( - min_acc.evaluate()?, - utf8_dict_scalar(key_type.clone(), expected_min) - ); + let min_result = evaluate_dictionary_accumulator( + MinAccumulator::try_new(dict_type)?, + batches, + )?; + assert_eq!(min_result, utf8_dict_scalar(key_type.clone(), expected_min)); - let mut max_acc = MaxAccumulator::try_new(dict_type)?; - for batch in batches { - max_acc.update_batch(&[Arc::clone(batch)])?; - } - assert_eq!(max_acc.evaluate()?, utf8_dict_scalar(key_type, expected_max)); + let max_result = evaluate_dictionary_accumulator( + MaxAccumulator::try_new(dict_type)?, + batches, + )?; + assert_eq!(max_result, utf8_dict_scalar(key_type, expected_max)); Ok(()) } @@ -1312,8 +1317,8 @@ mod tests { #[test] fn test_min_max_dictionary_without_coercion() -> Result<()> { let dict_array_ref = string_dictionary_batch( - vec!["b", "c", "a", "d"], - vec![Some(0), Some(1), Some(2), Some(3)], + &["b", "c", "a", "d"], + &[Some(0), Some(1), Some(2), Some(3)], ); let dict_type = dict_array_ref.data_type().clone(); @@ -1323,8 +1328,8 @@ mod tests { #[test] fn test_min_max_dictionary_with_nulls() -> Result<()> { let dict_array_ref = string_dictionary_batch( - vec!["b", "c", "a"], - vec![None, Some(0), None, Some(1), Some(2)], + &["b", "c", "a"], + &[None, Some(0), None, Some(1), Some(2)], ); let dict_type = dict_array_ref.data_type().clone(); @@ -1335,10 +1340,8 @@ mod tests { fn test_min_max_dictionary_multi_batch() -> Result<()> { let dict_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); - let batch1 = - string_dictionary_batch(vec!["b", "c"], vec![Some(0), Some(1)]); - let batch2 = - string_dictionary_batch(vec!["a", "d"], vec![Some(0), Some(1)]); + let batch1 = string_dictionary_batch(&["b", "c"], &[Some(0), Some(1)]); + let batch2 = string_dictionary_batch(&["a", "d"], &[Some(0), Some(1)]); assert_dictionary_min_max(&dict_type, &[batch1, batch2], "a", "d") } From 9240400ba56efcdb96c0622480dad16130cfeebb Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 2 Apr 2026 16:50:23 +0800 Subject: [PATCH 4/6] Fix dictionary min/max behavior in DataFusion Update min_max.rs to ensure dictionary batches iterate actual array rows, comparing referenced scalar values. Unreferenced dictionary entries no longer affect MIN/MAX, and referenced null values are correctly skipped. Expanded tests to cover these changes and updated expectations Added regression tests for unreferenced and referenced null dictionary values. --- .../functions-aggregate-common/src/min_max.rs | 37 +++++++++++++------ datafusion/functions-aggregate/src/min_max.rs | 34 ++++++++++++++++- 2 files changed, 59 insertions(+), 12 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/min_max.rs b/datafusion/functions-aggregate-common/src/min_max.rs index aa802d4003f4d..21e67cf442077 100644 --- a/datafusion/functions-aggregate-common/src/min_max.rs +++ b/datafusion/functions-aggregate-common/src/min_max.rs @@ -18,8 +18,8 @@ //! Basic min/max functionality shared across DataFusion aggregate functions use arrow::array::{ - ArrayRef, AsArray as _, BinaryArray, BinaryViewArray, BooleanArray, Date32Array, - Date64Array, Decimal32Array, Decimal64Array, Decimal128Array, Decimal256Array, + ArrayRef, BinaryArray, BinaryViewArray, BooleanArray, Date32Array, Date64Array, + Decimal32Array, Decimal64Array, Decimal128Array, Decimal256Array, DurationMicrosecondArray, DurationMillisecondArray, DurationNanosecondArray, DurationSecondArray, FixedSizeBinaryArray, Float16Array, Float32Array, Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, IntervalDayTimeArray, @@ -457,13 +457,23 @@ macro_rules! min_max { fn dictionary_batch_extreme( values: &ArrayRef, - extreme_fn: fn(&ArrayRef) -> Result, + ordering: Ordering, ) -> Result { - let DataType::Dictionary(key_type, _) = values.data_type() else { - unreachable!("dictionary_batch_extreme requires dictionary arrays") - }; - let inner = extreme_fn(values.as_any_dictionary().values())?; - Ok(wrap_dictionary_scalar(key_type.as_ref(), inner)) + let mut extreme: Option = None; + + for i in 0..values.len() { + let current = ScalarValue::try_from_array(values, i)?; + if current.is_null() { + continue; + } + + match &extreme { + Some(existing) if existing.try_cmp(¤t)? != ordering => {} + _ => extreme = Some(current), + } + } + + extreme.map_or_else(|| ScalarValue::try_from(values.data_type()), Ok) } fn wrap_dictionary_scalar(key_type: &DataType, value: ScalarValue) -> ScalarValue { @@ -813,7 +823,9 @@ pub fn min_batch(values: &ArrayRef) -> Result { DataType::FixedSizeList(_, _) => { min_max_batch_generic(values, Ordering::Greater)? } - DataType::Dictionary(_, _) => dictionary_batch_extreme(values, min_batch)?, + DataType::Dictionary(_, _) => { + dictionary_batch_extreme(values, Ordering::Greater)? + } _ => min_max_batch!(values, min), }) } @@ -828,7 +840,10 @@ fn min_max_batch_generic(array: &ArrayRef, ordering: Ordering) -> Result Result { DataType::List(_) => min_max_batch_generic(values, Ordering::Less)?, DataType::LargeList(_) => min_max_batch_generic(values, Ordering::Less)?, DataType::FixedSizeList(_, _) => min_max_batch_generic(values, Ordering::Less)?, - DataType::Dictionary(_, _) => dictionary_batch_extreme(values, max_batch)?, + DataType::Dictionary(_, _) => dictionary_batch_extreme(values, Ordering::Less)?, _ => min_max_batch!(values, max), }) } diff --git a/datafusion/functions-aggregate/src/min_max.rs b/datafusion/functions-aggregate/src/min_max.rs index 6be7341ed10cc..09142a4858b5f 100644 --- a/datafusion/functions-aggregate/src/min_max.rs +++ b/datafusion/functions-aggregate/src/min_max.rs @@ -1259,7 +1259,7 @@ mod tests { let mut max_acc = MaxAccumulator::try_new(&rt_type)?; max_acc.update_batch(&[Arc::clone(&dict_array_ref)])?; let max_result = max_acc.evaluate()?; - assert_eq!(max_result, ScalarValue::Utf8(Some("🦀".to_string()))); + assert_eq!(max_result, ScalarValue::Utf8(Some("d".to_string()))); Ok(()) } @@ -1278,6 +1278,16 @@ mod tests { ) as ArrayRef } + fn optional_string_dictionary_batch( + values: &[Option<&str>], + keys: &[Option], + ) -> ArrayRef { + let values = Arc::new(StringArray::from(values.to_vec())) as ArrayRef; + Arc::new( + DictionaryArray::try_new(Int32Array::from(keys.to_vec()), values).unwrap(), + ) as ArrayRef + } + fn evaluate_dictionary_accumulator( mut acc: impl Accumulator, batches: &[ArrayRef], @@ -1336,6 +1346,28 @@ mod tests { assert_dictionary_min_max(&dict_type, &[dict_array_ref], "a", "c") } + #[test] + fn test_min_max_dictionary_ignores_unreferenced_values() -> Result<()> { + let dict_array_ref = string_dictionary_batch( + &["a", "z", "zz_unused"], + &[Some(1), Some(1), None], + ); + let dict_type = dict_array_ref.data_type().clone(); + + assert_dictionary_min_max(&dict_type, &[dict_array_ref], "z", "z") + } + + #[test] + fn test_min_max_dictionary_ignores_referenced_null_values() -> Result<()> { + let dict_array_ref = optional_string_dictionary_batch( + &[Some("b"), None, Some("a"), Some("d")], + &[Some(0), Some(1), Some(2), Some(3)], + ); + let dict_type = dict_array_ref.data_type().clone(); + + assert_dictionary_min_max(&dict_type, &[dict_array_ref], "a", "d") + } + #[test] fn test_min_max_dictionary_multi_batch() -> Result<()> { let dict_type = From ed2d3fd1e44363a9f6bea2d0d2c681c357e1ae59 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 7 Apr 2026 11:47:34 +0800 Subject: [PATCH 5/6] Refactor min/max logic for shared row-wise handling Consolidate row-wise min/max scan logic into a single helper in min_max.rs to ensure consistency between dictionary and generic complex-type paths. Add regression test for the float dictionary handling NaN and -inf cases, validating ordering semantics across batches. --- .../functions-aggregate-common/src/min_max.rs | 29 ++------------ datafusion/functions-aggregate/src/min_max.rs | 38 +++++++++++++++++++ 2 files changed, 42 insertions(+), 25 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/min_max.rs b/datafusion/functions-aggregate-common/src/min_max.rs index 21e67cf442077..8bce942bfa4c5 100644 --- a/datafusion/functions-aggregate-common/src/min_max.rs +++ b/datafusion/functions-aggregate-common/src/min_max.rs @@ -455,10 +455,7 @@ macro_rules! min_max { }}; } -fn dictionary_batch_extreme( - values: &ArrayRef, - ordering: Ordering, -) -> Result { +fn scalar_batch_extreme(values: &ArrayRef, ordering: Ordering) -> Result { let mut extreme: Option = None; for i in 0..values.len() { @@ -823,32 +820,14 @@ pub fn min_batch(values: &ArrayRef) -> Result { DataType::FixedSizeList(_, _) => { min_max_batch_generic(values, Ordering::Greater)? } - DataType::Dictionary(_, _) => { - dictionary_batch_extreme(values, Ordering::Greater)? - } + DataType::Dictionary(_, _) => scalar_batch_extreme(values, Ordering::Greater)?, _ => min_max_batch!(values, min), }) } /// Generic min/max implementation for complex types fn min_max_batch_generic(array: &ArrayRef, ordering: Ordering) -> Result { - let mut non_null_indices = (0..array.len()).filter(|&i| !array.is_null(i)); - let Some(first_idx) = non_null_indices.next() else { - return ScalarValue::try_from(array.data_type()); - }; - - let mut extreme = ScalarValue::try_from_array(array, first_idx)?; - for i in non_null_indices { - let current = ScalarValue::try_from_array(array, i)?; - if current.is_null() { - continue; - } - if extreme.is_null() || extreme.try_cmp(¤t)? == ordering { - extreme = current; - } - } - - Ok(extreme) + scalar_batch_extreme(array, ordering) } /// dynamically-typed max(array) -> ScalarValue @@ -900,7 +879,7 @@ pub fn max_batch(values: &ArrayRef) -> Result { DataType::List(_) => min_max_batch_generic(values, Ordering::Less)?, DataType::LargeList(_) => min_max_batch_generic(values, Ordering::Less)?, DataType::FixedSizeList(_, _) => min_max_batch_generic(values, Ordering::Less)?, - DataType::Dictionary(_, _) => dictionary_batch_extreme(values, Ordering::Less)?, + DataType::Dictionary(_, _) => scalar_batch_extreme(values, Ordering::Less)?, _ => min_max_batch!(values, max), }) } diff --git a/datafusion/functions-aggregate/src/min_max.rs b/datafusion/functions-aggregate/src/min_max.rs index 09142a4858b5f..fc6ec7bbc0e64 100644 --- a/datafusion/functions-aggregate/src/min_max.rs +++ b/datafusion/functions-aggregate/src/min_max.rs @@ -1288,6 +1288,13 @@ mod tests { ) as ArrayRef } + fn float_dictionary_batch(values: &[f32], keys: &[Option]) -> ArrayRef { + let values = Arc::new(Float32Array::from(values.to_vec())) as ArrayRef; + Arc::new( + DictionaryArray::try_new(Int32Array::from(keys.to_vec()), values).unwrap(), + ) as ArrayRef + } + fn evaluate_dictionary_accumulator( mut acc: impl Accumulator, batches: &[ArrayRef], @@ -1377,4 +1384,35 @@ mod tests { assert_dictionary_min_max(&dict_type, &[batch1, batch2], "a", "d") } + + #[test] + fn test_min_max_dictionary_float_with_nans() -> Result<()> { + let dict_type = + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Float32)); + let batch1 = float_dictionary_batch(&[0.0, f32::NAN], &[Some(0), Some(1)]); + let batch2 = float_dictionary_batch(&[f32::NEG_INFINITY], &[Some(0)]); + + let min_result = evaluate_dictionary_accumulator( + MinAccumulator::try_new(&dict_type)?, + &[Arc::clone(&batch1), Arc::clone(&batch2)], + )?; + assert_eq!( + min_result, + dict_scalar( + DataType::Int32, + ScalarValue::Float32(Some(f32::NEG_INFINITY)), + ) + ); + + let max_result = evaluate_dictionary_accumulator( + MaxAccumulator::try_new(&dict_type)?, + &[batch1, batch2], + )?; + assert_eq!( + max_result, + dict_scalar(DataType::Int32, ScalarValue::Float32(Some(f32::NAN))) + ); + + Ok(()) + } } From dad6e022362ac0924997d044d3520e4a41de61f8 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 7 Apr 2026 11:52:41 +0800 Subject: [PATCH 6/6] Refactor dictionary handling and simplify batch logic Remove the no-op dictionary macro and single-use wrapper. Collapse dictionary handling into a normalized path and seed scalar_batch_extreme from the first non-null value. Unify row-wise batch dispatch behind a shared predicate. Apply formatting adjustments in min_max.rs as per cargo fmt. --- .../functions-aggregate-common/src/min_max.rs | 110 +++++++++--------- datafusion/functions-aggregate/src/min_max.rs | 6 +- 2 files changed, 56 insertions(+), 60 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/min_max.rs b/datafusion/functions-aggregate-common/src/min_max.rs index 8bce942bfa4c5..ef48fd3f69c9b 100644 --- a/datafusion/functions-aggregate-common/src/min_max.rs +++ b/datafusion/functions-aggregate-common/src/min_max.rs @@ -141,10 +141,6 @@ macro_rules! min_max_generic { }}; } -macro_rules! min_max_dictionary { - ($VALUE:expr, $DELTA:expr, $OP:ident) => {{ min_max_generic!($VALUE, $DELTA, $OP) }}; -} - // min/max of two scalar values of the same type macro_rules! min_max { ($VALUE:expr, $DELTA:expr, $OP:ident) => {{ @@ -417,32 +413,20 @@ macro_rules! min_max { min_max_generic!(lhs, rhs, $OP) } - ( - ScalarValue::Dictionary(key_type, lhs_inner), - ScalarValue::Dictionary(_, rhs_inner), - ) => { - wrap_dictionary_scalar( - key_type.as_ref(), - min_max_dictionary!( - lhs_inner.as_ref(), - rhs_inner.as_ref(), - $OP - ), - ) - } + (lhs, rhs) + if matches!(lhs, ScalarValue::Dictionary(_, _)) + || matches!(rhs, ScalarValue::Dictionary(_, _)) => + { + let (lhs, lhs_key_type) = dictionary_scalar_parts(lhs); + let (rhs, rhs_key_type) = dictionary_scalar_parts(rhs); + let result = min_max_generic!(lhs, rhs, $OP); - ( - ScalarValue::Dictionary(_, lhs_inner), - rhs, - ) => { - min_max_dictionary!(lhs_inner.as_ref(), rhs, $OP) - } - - ( - lhs, - ScalarValue::Dictionary(_, rhs_inner), - ) => { - min_max_dictionary!(lhs, rhs_inner.as_ref(), $OP) + match lhs_key_type.zip(rhs_key_type) { + Some((key_type, _)) => { + ScalarValue::Dictionary(Box::new(key_type.clone()), Box::new(result)) + } + None => result, + } } e => { @@ -456,25 +440,50 @@ macro_rules! min_max { } fn scalar_batch_extreme(values: &ArrayRef, ordering: Ordering) -> Result { - let mut extreme: Option = None; + let mut index = 0; + let mut extreme = loop { + if index == values.len() { + return ScalarValue::try_from(values.data_type()); + } + + let current = ScalarValue::try_from_array(values, index)?; + index += 1; - for i in 0..values.len() { - let current = ScalarValue::try_from_array(values, i)?; - if current.is_null() { - continue; + if !current.is_null() { + break current; } + }; + + while index < values.len() { + let current = ScalarValue::try_from_array(values, index)?; + index += 1; - match &extreme { - Some(existing) if existing.try_cmp(¤t)? != ordering => {} - _ => extreme = Some(current), + if !current.is_null() && extreme.try_cmp(¤t)? == ordering { + extreme = current; } } - extreme.map_or_else(|| ScalarValue::try_from(values.data_type()), Ok) + Ok(extreme) +} + +fn dictionary_scalar_parts(value: &ScalarValue) -> (&ScalarValue, Option<&DataType>) { + match value { + ScalarValue::Dictionary(key_type, inner) => { + (inner.as_ref(), Some(key_type.as_ref())) + } + other => (other, None), + } } -fn wrap_dictionary_scalar(key_type: &DataType, value: ScalarValue) -> ScalarValue { - ScalarValue::Dictionary(Box::new(key_type.clone()), Box::new(value)) +fn is_row_wise_batch_type(data_type: &DataType) -> bool { + matches!( + data_type, + DataType::Struct(_) + | DataType::List(_) + | DataType::LargeList(_) + | DataType::FixedSizeList(_, _) + | DataType::Dictionary(_, _) + ) } /// An accumulator to compute the maximum value @@ -814,22 +823,13 @@ pub fn min_batch(values: &ArrayRef) -> Result { min_binary_view ) } - DataType::Struct(_) => min_max_batch_generic(values, Ordering::Greater)?, - DataType::List(_) => min_max_batch_generic(values, Ordering::Greater)?, - DataType::LargeList(_) => min_max_batch_generic(values, Ordering::Greater)?, - DataType::FixedSizeList(_, _) => { - min_max_batch_generic(values, Ordering::Greater)? + data_type if is_row_wise_batch_type(data_type) => { + scalar_batch_extreme(values, Ordering::Greater)? } - DataType::Dictionary(_, _) => scalar_batch_extreme(values, Ordering::Greater)?, _ => min_max_batch!(values, min), }) } -/// Generic min/max implementation for complex types -fn min_max_batch_generic(array: &ArrayRef, ordering: Ordering) -> Result { - scalar_batch_extreme(array, ordering) -} - /// dynamically-typed max(array) -> ScalarValue pub fn max_batch(values: &ArrayRef) -> Result { Ok(match values.data_type() { @@ -875,11 +875,9 @@ pub fn max_batch(values: &ArrayRef) -> Result { let value = value.map(|e| e.to_vec()); ScalarValue::FixedSizeBinary(*size, value) } - DataType::Struct(_) => min_max_batch_generic(values, Ordering::Less)?, - DataType::List(_) => min_max_batch_generic(values, Ordering::Less)?, - DataType::LargeList(_) => min_max_batch_generic(values, Ordering::Less)?, - DataType::FixedSizeList(_, _) => min_max_batch_generic(values, Ordering::Less)?, - DataType::Dictionary(_, _) => scalar_batch_extreme(values, Ordering::Less)?, + data_type if is_row_wise_batch_type(data_type) => { + scalar_batch_extreme(values, Ordering::Less)? + } _ => min_max_batch!(values, max), }) } diff --git a/datafusion/functions-aggregate/src/min_max.rs b/datafusion/functions-aggregate/src/min_max.rs index fc6ec7bbc0e64..9bd7e153d382b 100644 --- a/datafusion/functions-aggregate/src/min_max.rs +++ b/datafusion/functions-aggregate/src/min_max.rs @@ -1355,10 +1355,8 @@ mod tests { #[test] fn test_min_max_dictionary_ignores_unreferenced_values() -> Result<()> { - let dict_array_ref = string_dictionary_batch( - &["a", "z", "zz_unused"], - &[Some(1), Some(1), None], - ); + let dict_array_ref = + string_dictionary_batch(&["a", "z", "zz_unused"], &[Some(1), Some(1), None]); let dict_type = dict_array_ref.data_type().clone(); assert_dictionary_min_max(&dict_type, &[dict_array_ref], "z", "z")