Skip to content

Commit 40b2161

Browse files
committed
[Analytics Backend / DataFusion] Onboard PPL mvappend via custom Rust UDF
PPL `mvappend(arg1, arg2, …)` flattens a mixed list of array and scalar arguments into one array, dropping null arguments and null elements within array arguments. DataFusion's `array_concat` is the closest stdlib match but only accepts arrays (not mixed array+scalar) and preserves nulls — different semantics. Onboards as a custom Rust ScalarUDF on the analytics-backend-datafusion plugin's session context, mirroring the mvzip / mvfind pattern. Templated shape: Rust side: udf::mvappend::MvappendUdf — Signature::user_defined; per-row walk over operands, skipping NULL args and NULL elements inside array args, with explicit Arrow type arms for {Int8/16/32/64, UInt8/16/32/64, Float32/64, Boolean, Utf8/LargeUtf8/Utf8View}. The string arms output List<Utf8> or List<Utf8View> depending on the inferred element type so the result schema matches what `return_type` declared (DataFusion's execution-time schema check rejects mismatches). Defensive Null element-type arm covers the empty-array shape. 6 unit tests. Registered on each session context via udf::register_all. Java side: ScalarFunction.MVAPPEND enum entry (SqlKind.OTHER_FUNCTION; resolves through identifier-name valueOf("MVAPPEND")). MvappendAdapter — locally-declared SqlFunction("mvappend") + ADDITIONAL_SCALAR_SIGS bridge. Casts every scalar operand to the call's array component type and every array operand to ARRAY<componentType> before substrait emission, so the UDF sees a single uniform element type across all positions. DataFusionAnalyticsBackendPlugin: ARRAY_RETURNING_PROJECT_OPS membership (returns ARRAY<commonType>); adapter registration in scalarFunctionAdapters(). opensearch_array_functions.yaml: variadic min:1 entry with `list<any1?>` return type. # Pass-rate (CalciteMVAppendFunctionIT, force-routed) * Before: 0/15. * After: 6/15. Newly passing: testMvappendWithMultipleElements, testMvappendWithSingleElement, testMvappendWithArrayFlattening, testMvappendWithStringValues, testMvappendWithNestedArrays, testMvappendWithRealFields. # Remaining failures * 8 tests fail with "Unable to convert the type ANY". Root cause is PPL's MVAppendFunctionImpl.updateMostGeneralType using strict Object.equals on each pair of operand types, returning Calcite's ANY type when any two don't match — including when they only differ in nullability tag (a literal 3 is INTEGER NOT NULL but the component type of `array(1, 2)` is INTEGER NULLABLE). Substrait can't serialize ANY. The fix belongs in the SQL plugin's MVAppendFunctionImpl (use typeFactory.leastRestrictive instead of Object.equals) and isn't addressed here. * testMvappendInWhereClause — uses `where array_length(combined) = 2` which the analytics-engine planner rejects with "No backend can evaluate filter predicate [EQUALS] on fields [combined:ARRAY]". Filter-side capability gap unrelated to mvappend. * testMvappendWithComplexExpression — fails substrait conversion on a nested mvappend call ("Unable to convert call mvappend(list, …)"), likely the same nullability widening pattern flowing through nested calls. Same upstream fix applies. # Pass-rate impact on the broader CalciteArrayFunctionIT Unchanged at 43/60 — mvappend isn't exercised there. Signed-off-by: Kai Huang <ahkcs@amazon.com>
1 parent 67a97ad commit 40b2161

7 files changed

Lines changed: 613 additions & 4 deletions

File tree

sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/ScalarFunction.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,16 @@ public enum ScalarFunction {
174174
* DataFusion stdlib equivalent — the analytics-backend-datafusion plugin
175175
* ships a custom Rust UDF (`udf::mvfind`) registered on its session context.
176176
*/
177-
MVFIND(Category.SCALAR, SqlKind.OTHER_FUNCTION);
177+
MVFIND(Category.SCALAR, SqlKind.OTHER_FUNCTION),
178+
/**
179+
* PPL {@code mvappend(arg1, arg2, …)} — flatten a mixed list of array and
180+
* scalar arguments into one array, dropping null args and null elements.
181+
* Resolves through the SQL plugin's {@code MVAppendFunctionImpl} UDF named
182+
* {@code "mvappend"}. DataFusion's {@code array_concat} only accepts arrays
183+
* and preserves nulls, so the analytics-backend-datafusion plugin ships a
184+
* custom Rust UDF ({@code udf::mvappend}) registered on its session context.
185+
*/
186+
MVAPPEND(Category.SCALAR, SqlKind.OTHER_FUNCTION);
178187

179188
/**
180189
* Category of scalar function.

sandbox/plugins/analytics-backend-datafusion/rust/src/udf/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,14 +113,16 @@ pub(crate) fn coerce_args(
113113
}
114114

115115
pub mod convert_tz;
116+
pub mod mvappend;
116117
pub mod mvfind;
117118
pub mod mvzip;
118119

119120
pub fn register_all(ctx: &SessionContext) {
120121
convert_tz::register_all(ctx);
121122
mvzip::register_all(ctx);
122123
mvfind::register_all(ctx);
123-
log::info!("OpenSearch UDF register_all: convert_tz, mvzip, mvfind registered");
124+
mvappend::register_all(ctx);
125+
log::info!("OpenSearch UDF register_all: convert_tz, mvzip, mvfind, mvappend registered");
124126
}
125127

126128
#[cfg(test)]

0 commit comments

Comments
 (0)