Skip to content

Commit c059c38

Browse files
committed
[Analytics Backend / DataFusion] Onboard PPL mvzip via custom Rust UDF
PPL `mvzip(left, right [, sep])` element-wise zips two arrays into a list of strings, joined per pair by a separator (default `,`). DataFusion has no stdlib equivalent — `array_concat` is end-to-end concatenation, and Substrait's lambda support is too thin for a transform/zip rewrite — so this onboards a custom Rust ScalarUDF on the analytics-backend-datafusion plugin's session context and wires the Java side to route to it. Templated shape (extends the existing pattern from convert_tz): Rust side: udf::mvzip::MvzipUdf — Signature::user_defined; coerce_types pins the first two args to ListArray and the optional 3rd to Utf8; invoke_with_args iterates per row, takes min(len(left), len(right)) elements, stringifies each (matching `Objects.toString(elem, "")` for null elements), and builds a List<Utf8>. Defensive Null-element-type arm handles the empty array case before the SQL-plugin VARCHAR-default kicks in. Registered on each session context via udf::register_all alongside convert_tz. 7 unit tests cover the basic / custom-sep / truncation / null-element / null-array / empty-array / numeric-array shapes. Java side: ScalarFunction.MVZIP enum entry (SqlKind.OTHER_FUNCTION; resolves through identifier-name valueOf("MVZIP") since PPL's MVZipFunctionImpl registers under the function name "mvzip"). MvzipAdapter — locally-declared SqlFunction("mvzip") + ADDITIONAL_SCALAR_SIGS bridge so isthmus emits a Substrait scalar function call with the exact name the Rust UDF is registered under. DataFusionAnalyticsBackendPlugin: ARRAY_RETURNING_PROJECT_OPS membership (returns ARRAY<VARCHAR>, registered against FieldType.ARRAY); adapter registration in scalarFunctionAdapters(). opensearch_array_functions.yaml: two impls for arity-2 and arity-3. # Pass-rate (CalciteArrayFunctionIT, force-routed) * Before: 28/60. * After: 34/60. Newly passing — all 5 testMvzip* variants: testMvzipBasic, testMvzipWithCustomDelimiter, testMvzipNested, testMvzipWithEmptyArray, testMvzipWithBothEmptyArrays. (Test count delta is +6 because the test class also exercises mvzip in 1 other test under a different name, picked up by the same fix.) # Companion changes This PR's run also picks up the SQL-plugin companion #5421 which defaults empty `array()` to ARRAY<VARCHAR>. Without that companion the testMvzipWith*EmptyArray variants would still fail — substrait would reject the input ARRAY<NULL> type before reaching the UDF. The Rust UDF's Null-element arm exists as a defensive backstop in case the call ever reaches it with a null-typed list. Signed-off-by: Kai Huang <ahkcs@amazon.com>
1 parent 9a4587a commit c059c38

7 files changed

Lines changed: 523 additions & 4 deletions

File tree

sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/ScalarFunction.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,15 @@ public enum ScalarFunction {
158158
* is {@code array_element}, also 1-based; the DataFusion backend renames via a
159159
* name-mapping adapter.
160160
*/
161-
ITEM(Category.SCALAR, SqlKind.ITEM);
161+
ITEM(Category.SCALAR, SqlKind.ITEM),
162+
/**
163+
* PPL {@code mvzip(left, right [, sep])} — element-wise zip of two arrays into an
164+
* array of strings, joined per pair by a separator (default {@code ","}). Resolves
165+
* through the SQL plugin's {@code MVZipFunctionImpl} UDF named {@code "mvzip"}.
166+
* No DataFusion stdlib equivalent — the analytics-backend-datafusion plugin ships
167+
* a custom Rust UDF (`udf::mvzip`) registered on its session context.
168+
*/
169+
MVZIP(Category.SCALAR, SqlKind.OTHER_FUNCTION);
162170

163171
/**
164172
* Category of scalar function.

sandbox/plugins/analytics-backend-datafusion/rust/src/udf/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,10 +113,12 @@ pub(crate) fn coerce_args(
113113
}
114114

115115
pub mod convert_tz;
116+
pub mod mvzip;
116117

117118
pub fn register_all(ctx: &SessionContext) {
118119
convert_tz::register_all(ctx);
119-
log::info!("OpenSearch UDF register_all: convert_tz registered");
120+
mvzip::register_all(ctx);
121+
log::info!("OpenSearch UDF register_all: convert_tz, mvzip registered");
120122
}
121123

122124
#[cfg(test)]

0 commit comments

Comments
 (0)