Skip to content

Commit 44d604e

Browse files
committed
[Analytics Backend / DataFusion] Reshape mvappend operands as uniform lists; add Decimal128 element support
Two follow-ons to the initial mvappend onboarding (40b2161), both surfaced once the SQL companion #5424 (`MVAppendFunctionImpl.leastRestrictive`) let homogeneous-type calls reach substrait conversion. # Uniform-list operand reshape Substrait's variadic-`any1` argument shape requires every operand at the same variadic position to share a type. PPL's `mvappend(arg, …)` accepts a mix of bare scalars and arrays, which substrait's signature matcher rejected with `Unable to convert call mvappend(list<i32?>, i32?, i32?)`. `MvappendAdapter` now wraps each scalar operand in a singleton `make_array(scalar)` call (using the locally-declared `MakeArrayAdapter.LOCAL_MAKE_ARRAY_OP`) so by the time the substrait converter sees the operands they're uniformly `list<componentType>`. The yaml impl was correspondingly tightened from `args: [{ value: any1 }] variadic` to `args: [{ value: list<any1?> }] variadic`. Rust UDF (`udf::mvappend`) keeps its scalar-handling branch intact as a defensive fallback, but in practice every operand it sees is a list now. # Decimal128 element type Calcite's leastRestrictive widening on INT + DECIMAL produces DECIMAL(p, s) which substrait converts to Decimal128(p, s); the Java adapter casts every operand's element type to that. The Rust UDF needed an explicit `DataType::Decimal128(p, s)` branch — Decimal128Builder requires `.with_precision_and_scale(p, s)` configuration before use, and Decimal128Array elements are read via the `i128`-valued `value(i)` accessor (not via the generic `build!` macro). # Pass-rate (CalciteMVAppendFunctionIT, force-routed, with companion #5424 applied) * Before this commit: 6/15 (initial mvappend onboarding). * After this commit: 10/15. Newly passing: testMvappendWithMixedArrayAndScalar (uniform-list reshape), testMvappendWithComplexExpression (uniform-list reshape), testMvappendWithIntAndDouble (Decimal128 element), testMvappendWithNumericArrays (Decimal128 element). Remaining 5 failures: * testMvappendWithMixedTypes / WithFieldsAndLiterals / WithEmptyArray / WithNull — call legitimately widens to ARRAY<ANY> because operands contain pairs of types with no common widened type (INT + VARCHAR). The Calcite engine handles ANY via Object generic dispatch; substrait can't encode it. Out of scope without changing PPL UDF semantics. * testMvappendInWhereClause — uses `where array_length(combined) = 2` which the analytics-engine planner rejects with "No backend can evaluate filter predicate [EQUALS] on fields [combined:ARRAY]". Filter-side capability gap unrelated to mvappend. Signed-off-by: Kai Huang <ahkcs@amazon.com>
1 parent 40b2161 commit 44d604e

3 files changed

Lines changed: 77 additions & 24 deletions

File tree

sandbox/plugins/analytics-backend-datafusion/rust/src/udf/mvappend.rs

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,12 @@ use std::any::Any;
3636
use std::sync::Arc;
3737

3838
use datafusion::arrow::array::{
39-
Array, ArrayRef, AsArray, BooleanArray, BooleanBuilder, Float32Array, Float32Builder,
40-
Float64Array, Float64Builder, GenericListArray, Int16Array, Int16Builder, Int32Array,
41-
Int32Builder, Int64Array, Int64Builder, Int8Array, Int8Builder, ListArray, ListBuilder,
42-
StringArray, StringBuilder, StringViewArray, StringViewBuilder, UInt16Array, UInt16Builder,
43-
UInt32Array, UInt32Builder, UInt64Array, UInt64Builder, UInt8Array, UInt8Builder,
39+
Array, ArrayRef, AsArray, BooleanArray, BooleanBuilder, Decimal128Array, Decimal128Builder,
40+
Float32Array, Float32Builder, Float64Array, Float64Builder, GenericListArray, Int16Array,
41+
Int16Builder, Int32Array, Int32Builder, Int64Array, Int64Builder, Int8Array, Int8Builder,
42+
ListArray, ListBuilder, StringArray, StringBuilder, StringViewArray, StringViewBuilder,
43+
UInt16Array, UInt16Builder, UInt32Array, UInt32Builder, UInt64Array, UInt64Builder,
44+
UInt8Array, UInt8Builder,
4445
};
4546
use datafusion::arrow::datatypes::{DataType, Field};
4647
use datafusion::common::plan_err;
@@ -182,6 +183,54 @@ impl ScalarUDFImpl for MvappendUdf {
182183
DataType::Float32 => build!(Float32Builder, Float32Array, ListArray),
183184
DataType::Float64 => build!(Float64Builder, Float64Array, ListArray),
184185
DataType::Boolean => build!(BooleanBuilder, BooleanArray, ListArray),
186+
// Decimal128 element type — needs a builder configured with the same precision
187+
// and scale as the input. Calcite's leastRestrictive widening for INT + DECIMAL
188+
// produces DECIMAL(p, s) which substrait converts to Decimal128(p, s); the Java
189+
// adapter's CAST aligns every operand's element type to that.
190+
DataType::Decimal128(precision, scale) => {
191+
let inner = Decimal128Builder::new()
192+
.with_precision_and_scale(*precision, *scale)
193+
.map_err(|e| DataFusionError::Plan(format!("mvappend: decimal builder: {e}")))?;
194+
let mut builder = ListBuilder::new(inner);
195+
for row in 0..n {
196+
let mut any_value = false;
197+
for arr in &operand_arrays {
198+
if arr.is_null(row) {
199+
continue;
200+
}
201+
if let Some(list_arr) = arr.as_any().downcast_ref::<GenericListArray<i32>>() {
202+
let row_list = list_arr.value(row);
203+
let typed = row_list
204+
.as_any()
205+
.downcast_ref::<Decimal128Array>()
206+
.ok_or_else(|| DataFusionError::Internal(format!(
207+
"mvappend: list element vector type mismatch ({:?})",
208+
row_list.data_type()
209+
)))?;
210+
for i in 0..typed.len() {
211+
if !typed.is_null(i) {
212+
builder.values().append_value(typed.value(i));
213+
any_value = true;
214+
}
215+
}
216+
} else if let Some(typed) = arr.as_any().downcast_ref::<Decimal128Array>() {
217+
builder.values().append_value(typed.value(row));
218+
any_value = true;
219+
} else {
220+
return plan_err!(
221+
"mvappend: unexpected operand vector type {:?}",
222+
arr.data_type()
223+
);
224+
}
225+
}
226+
if any_value {
227+
builder.append(true);
228+
} else {
229+
builder.append_null();
230+
}
231+
}
232+
Arc::new(builder.finish()) as ArrayRef
233+
}
185234
// String element types — handled specially because list children may be any of
186235
// {Utf8, LargeUtf8, Utf8View} depending on whether the operand is a string literal,
187236
// a field read (DataFusion's substrait consumer uses Utf8View for column reads in

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/MvappendAdapter.java

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -65,29 +65,31 @@ public RexNode adapt(RexCall original, List<FieldStorageInfo> fieldStorage, RelO
6565
RelDataType arrayType = original.getType();
6666
RelDataType componentType = arrayType.getComponentType();
6767
if (componentType == null) {
68-
// Defensive — Calcite always assigns ARRAY<X> as mvappend's return type. Pass
69-
// operands through untouched and let substrait surface the error if anything's
70-
// off.
7168
return rexBuilder.makeCall(arrayType, LOCAL_MVAPPEND_OP, original.getOperands());
7269
}
70+
// Substrait's variadic {@code any1} parameter requires every operand at the same
71+
// variadic position to share a type. PPL's {@code mvappend(arg, …)} accepts a mix
72+
// of bare scalars and arrays, which substrait's signature matcher rejects with
73+
// {@code Unable to convert call mvappend(list<…>, scalar, …)}. Normalize every
74+
// operand to {@code ARRAY<componentType>} — array operands cast their element
75+
// type if it differs; scalar operands wrap in a {@code make_array(…)} singleton
76+
// call. The Rust UDF then sees a uniform {@code list<any1>} variadic.
77+
RelDataType targetArrayType = cluster.getTypeFactory().createArrayType(componentType, -1);
7378
List<RexNode> coerced = new ArrayList<>(original.getOperands().size());
7479
for (RexNode operand : original.getOperands()) {
7580
RelDataType operandType = operand.getType();
7681
if (operandType.getComponentType() != null) {
7782
// Array operand — cast to ARRAY<componentType> if its element type differs.
78-
RelDataType targetArray = cluster.getTypeFactory().createArrayType(componentType, -1);
79-
if (operandType.equals(targetArray)) {
83+
if (operandType.equals(targetArrayType)) {
8084
coerced.add(operand);
8185
} else {
82-
coerced.add(rexBuilder.makeCast(targetArray, operand, true, false));
86+
coerced.add(rexBuilder.makeCast(targetArrayType, operand, true, false));
8387
}
8488
} else {
85-
// Scalar operand — cast to componentType if its type differs.
86-
if (operandType.equals(componentType)) {
87-
coerced.add(operand);
88-
} else {
89-
coerced.add(rexBuilder.makeCast(componentType, operand, true, false));
90-
}
89+
// Scalar operand — first cast to componentType (so the singleton array's
90+
// element type matches), then wrap in make_array so substrait sees a list.
91+
RexNode casted = operandType.equals(componentType) ? operand : rexBuilder.makeCast(componentType, operand, true, false);
92+
coerced.add(rexBuilder.makeCall(targetArrayType, MakeArrayAdapter.LOCAL_MAKE_ARRAY_OP, List.of(casted)));
9193
}
9294
}
9395
return rexBuilder.makeCall(arrayType, LOCAL_MVAPPEND_OP, coerced);

sandbox/plugins/analytics-backend-datafusion/src/main/resources/opensearch_array_functions.yaml

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -85,15 +85,17 @@ scalar_functions:
8585

8686
- name: mvappend
8787
description: >-
88-
Flatten a mixed list of array and scalar arguments into one array,
89-
dropping null arguments and null elements within array arguments. Returns
90-
NULL if no non-null elements were collected. PPL surface is
91-
{@code mvappend(arg1, arg2, …)}; backed by a custom Rust UDF on the
92-
analytics-backend-datafusion plugin (DataFusion's array_concat only
93-
accepts arrays and preserves nulls — different semantics).
88+
Flatten a list of arrays into one array, dropping null arrays and null
89+
elements within array arguments. Returns NULL if no non-null elements
90+
were collected. PPL surface is {@code mvappend(arg1, arg2, …)} which
91+
accepts mixed scalar+array operands; the Java adapter wraps each
92+
scalar in a singleton {@code make_array(…)} call so by the time the
93+
Rust UDF sees the operands they're uniformly arrays. Backed by a custom
94+
Rust UDF on the analytics-backend-datafusion plugin (DataFusion's
95+
array_concat preserves nulls — different semantics).
9496
impls:
9597
- args:
96-
- value: "any1"
98+
- value: "list<any1?>"
9799
name: arg
98100
variadic:
99101
min: 1

0 commit comments

Comments
 (0)