Skip to content

Commit 4dea6c4

Browse files
authored
[Analytics Backend / DataFusion] Onboard PPL array constructor and 8 multivalue (mv) functions to analytics-engine route (opensearch-project#21554)
* [Analytics Backend / DataFusion] Wire PPL array constructor + array_length / array_slice / array_distinct / mvjoin Onboards the PPL `array(a, b, …)` constructor and four array-consuming functions to the analytics-engine route by mapping their Calcite lowering targets through Substrait to DataFusion's native make_array / array_length / array_slice / array_distinct / array_to_string. Same templated shape as the `replace` PR (opensearch-project#21527), with two extensions: ScalarFunction enum constants (5) + STANDARD_PROJECT_OPS / ARRAY_RETURNING_PROJECT_OPS membership + opensearch_array_functions.yaml extension entries + ADDITIONAL_SCALAR_SIGS Calcite-op→Substrait-name bridges + scalarFunctionAdapters() entries for the 3 functions that need operand normalization = onboarded to the analytics route. Capability lookup at OpenSearchProjectRule keys on the call's return type; for array-returning functions (`array(...)`, `array_slice`, `array_distinct`) the return type resolves to `SqlTypeName.ARRAY`, which previously hit `default → null` in `FieldType.fromSqlTypeName` and emptied the viable-backend list before the registration could match. * `FieldType.ARRAY` added to the analytics SPI enum. * `SqlTypeName.ARRAY → FieldType.ARRAY` mapping in `fromSqlTypeName`. * `ARRAY_RETURNING_PROJECT_OPS` registered against `Set.of(FieldType.ARRAY)` only — separate from `STANDARD_PROJECT_OPS` so `FieldType.ARRAY` doesn't pollute filter / aggregate capabilities (no meaningful semantics over array-typed values there). * `ArrowSchemaFromCalcite.toArrowField` recurses into the component type to build the matching Arrow `List<inner>` field — without this the result schema would have a bare `List` with no element field and the backend's Arrow IPC reader would fail to bind result columns. Substrait's standard catalog has no array_* entries, so isthmus' `RexExpressionConverter` would fail with "Unable to convert call …" on every array call. New `opensearch_array_functions.yaml` declares: * `make_array(any1, …)` → `list<any1>` (variadic, min: 0). * `array_length(list<any1>)` → `i64?`. * `array_slice(list<any1>, i64, i64)` → `list<any1>` (with i32 fallback). * `array_distinct(list<any1>)` → `list<any1>`. * `array_to_string(list<any1>, string)` → `string?` (with varchar fallback). Loaded via `SimpleExtension.load("/opensearch_array_functions.yaml")` and merged into the plugin's extension collection in `DataFusionPlugin.loadSubstraitExtensions()`. Substrait's call-conversion path (and DataFusion's signature matcher) is strict about operand types in ways Calcite's PPL lowering doesn't naturally satisfy. Three adapters bridge the gap: * `MakeArrayAdapter` — implements `ScalarFunctionAdapter` directly (not `AbstractNameMappingAdapter`). PPL's `ArrayFunctionImpl` infers `ARRAY<commonElementType>` for the call's return type but does NOT widen the individual operand types. So `array(1, 1.5)` produces a RexCall whose operands are `(INTEGER, DECIMAL(2,1))` but whose return type is `ARRAY<DOUBLE>`. Substrait's variadic-`any1` consistency validator throws an `AssertionError` in that case (not a recoverable exception — it fatally exits the search-thread JVM). The adapter extracts the call's component type and CASTs each non-matching operand to it before emission. * `ArrayToStringAdapter` — declares a local `array_to_string` op and name-maps `SqlLibraryOperators.ARRAY_JOIN` → it. * `ArraySliceAdapter` — passes the `ARRAY_SLICE` call through unchanged but coerces the index operands (positions 1, 2, optional 3) to `BIGINT`. PPL's parser types positive integer literals as `DECIMAL(20,0)`; DataFusion's `array_slice` signature accepts only integer indexes and refuses to coerce decimal arguments. Two third-party dependencies that surfaced as fatal `NoClassDefFoundError` during execution of array-returning calls: * `commons-text` to analytics-engine — Calcite's `SqlFunctions` class statically references `org.apache.commons.text.similarity.LevenshteinDistance`. Without it, any Calcite RelNode walk that touches `SqlFunctions.<clinit>` poisons the search-thread JVM. * `jackson-datatype-jsr310` to **arrow-flight-rpc** (the parent plugin that bundles `arrow-vector`). `arrow-vector`'s `JsonStringArrayList` eagerly registers `JavaTimeModule` on its ObjectMapper in `<clinit>`, so any reader of an Arrow `ListVector` (i.e. every array-returning DataFusion call flowing through analytics-engine) hits a fatal NoClassDefFoundError. The dep belongs on arrow-flight-rpc's classpath because that plugin defines arrow-vector's classloader; bundling it in analytics-backend-datafusion (the child plugin) is invisible to arrow-vector. Marked `compileOnly` here to avoid jar-hell with arrow-flight-rpc's `api` dependency. * Before: 1/60 (testArrayWithMix only — exercises an error path that fails before the ARRAY capability lookup). * After: 9/60. Newly passing: testArray, testArrayWithString, testArrayLength, testMvjoinWithStringArray, testMvjoinWithStringifiedNumbers, testMvjoinWithMixedStringValues, testMvjoinWithStringBooleans, testMvjoinWithSpecialDelimiters, testMvjoinWithArrayFromRealFields, testMvjoinWithMultipleRealFields. The remaining 51 failures fall into three buckets: * 50 — out-of-scope S1+ functions (`mvfind`, `mvzip`, `reduce`, `transform`, `forall`, `filter`, `exists`, `ITEM`). These are PPL UDFs without direct DataFusion equivalents and need either lambda-substrait wiring or custom UDF registration on the Rust side. * 5 — `testMvindexRange*` family. PPL's `mvindex(arr, from, to)` lowers to `ARRAY_SLICE(arr, from+1, to+1)` (1-based shift) but the lowering is missing the +1, so DataFusion's 1-based array_slice returns a window shifted by one. Fix belongs in the SQL plugin's PPL→Calcite lowering layer. * 1 — `testMvindexRangeMixed` JSON formatting mismatch (test code expects bare `[a,b,c]` but the response is `\"[\\\"a\\\",\\\"b\\\",\\\"c\\\"]\"`). Signed-off-by: Kai Huang <ahkcs@amazon.com> * [Analytics Backend / DataFusion] Fix ARRAY_SLICE 0-based-(start, length) → 1-based-(start, end) for DataFusion Calcite's `SqlLibraryOperators.ARRAY_SLICE` is the Spark / Hive flavor — 0-based start, third arg is the length-of-elements to take. PPL's `MVIndexFunctionImp.resolveRange` (in the SQL plugin) emits this form, e.g. `mvindex(arr=[1..5], 1, 3)` → `ARRAY_SLICE(arr, 1, 3)` meaning "start at 0-based position 1, take 3 elements" → expected `[2, 3, 4]`. DataFusion's native `array_slice` is the Postgres / Snowflake flavor — 1-based start, third arg is the inclusive end-index. So the same call `array_slice(arr, 1, 3)` returns elements at 1-based positions 1..3 → `[1, 2, 3]`. Off-by-one across every `mvindex` range query. Convert the operands in the adapter rather than the SQL plugin's PPL lowering, because the lowering's existing semantics are correct for Calcite's local executor (used by every non-analytics path); the bug is only in the bridge to DataFusion. start' = start + 1 end' = start + length (== start + 1 + (length - 1)) `MVIndexFunctionImp` already normalizes negative indexes to non-negative 0-based positions before invoking ARRAY_SLICE (it uses `arrayLen + idx`), so the arithmetic above applies uniformly. Empirically: `mvindex(arr=[1..5], 1, 3)` now returns the correct values `[2, 3, 4]` (was `[1, 2, 3]`); negative form `mvindex(arr, -3, -1)` returns `[3, 4, 5]` (was `[2, 3]`); mixed `mvindex(arr, -4, 2)` returns `[2, 3]` matching the PPL spec. The 5 `testMvindexRange*` tests still don't pass on the IT, but for an unrelated reason — array-typed result values are being returned as JSON-stringified scalars (`"[2,3,4]"`) instead of typed arrays. That's a response-formatting issue affecting every array-returning test (also `testArray`, `testArrayWithString`) and lives in a different code path; it'll be addressed separately. Signed-off-by: Kai Huang <ahkcs@amazon.com> * [Analytics Backend / DataFusion] Wire PPL ITEM (mvindex single-element) → DataFusion array_element PPL's `mvindex(arr, N)` single-element form lowers (in `MVIndexFunctionImp.resolveSingleElement`) to Calcite's `SqlStdOperatorTable.ITEM` operator with a 1-based index (already converted from PPL's 0-based input). DataFusion's native single-element array accessor is `array_element` (also 1-based), so a name-mapping adapter + yaml extension entry are sufficient. Templated shape: ScalarFunction.ITEM (SqlKind.ITEM) + STANDARD_PROJECT_OPS membership (returns the array's element type, which resolves through the existing FieldType.fromSqlTypeName → SUPPORTED_FIELD_TYPES capability lookup for non-array element types — array-of-array is rare in PPL and not exercised by the current test surface) + scalarFunctionAdapters() entry → ArrayElementAdapter ↳ rewrites SqlStdOperatorTable.ITEM to a locally-declared SqlFunction named "array_element" ↳ coerces the index operand to BIGINT (PPL's parser produces DECIMAL for positive integer literals; DataFusion's array_element rejects DECIMAL indexes, same as array_slice) + ADDITIONAL_SCALAR_SIGS bridge for the locally-declared op + opensearch_array_functions.yaml extension entry: array_element(list<any1>, i64) → any1? # Pass-rate (CalciteArrayFunctionIT, force-routed) * Before this commit: 9/60. * After this commit: 12/60. Newly passing: testMvindexSingleElementPositive, testMvindexSingleElementNegative, testMvindexSingleElementNegativeMiddle. The other 3 tests that hit the ITEM rejection (testMvfindWith*) are multi-step queries where ITEM is one node in a tree that also includes unrelated S1+ functions (mvfind/mvzip/etc.); they remain blocked by the upstream functions, not by ITEM itself. Signed-off-by: Kai Huang <ahkcs@amazon.com> * [Analytics Engine] Carry array-typed cells through RowResponseCodec without JSON-stringifying The row-oriented fragment-execution wire format (`FragmentExecutionResponse`, used when arrow-flight streaming is disabled — every single-node test cluster today) shipped each cell through OpenSearch's `writeGenericValue` / `readGenericValue`, which preserves `List` values as `ArrayList<Object>`. On the coordinator side, `RowResponseCodec.decode` then re-materialized the rows into a `VectorSchemaRoot` for `Iterable<VectorSchemaRoot>`-style consumers. Two bugs in that re-materialization were eating array values: 1. `inferArrowType` walked rows for the first non-null cell and matched against {Long, Integer, …, CharSequence, byte[], Number}. {@code List} wasn't in the chain, so it fell through to {@code break} and the fallback {@link ArrowType.Utf8} — every array column became a VARCHAR column. 2. `setVectorValue` for {@link VarCharVector} called {@code value.toString()}. For a {@code JsonStringArrayList} that returns the JSON form {@code "[2,3,4]"}, which then got serialized as a JSON string in the final response. Tests like {@code testMvindexRangePositive} saw their array result come back as a string `"[2,3,4]"` instead of an array `[2, 3, 4]`. Fix: * Replace {@code inferArrowType} with {@code inferField} that returns a full {@link Field}. For {@code List} cells, build a list field with the inner element type inferred from the first non-null element (with a fallback that scans later rows in case the first list is empty/all-null). * Add a {@code ListVector} arm to {@code setVectorValue} that delegates to a new {@code writeListValue}. The writer bypasses {@link UnionListWriter} entirely — it writes directly to the list's offset / validity buffers and to the inner data vector via the inner vector's typed `setSafe`. The writer-based API requires per-element `ArrowBuf` allocations for varchar elements that are easy to leak or use-after-free; the direct path is simpler and avoids both classes of bug. Plus a separate Arrow gotcha that surfaced once arrays started flowing through correctly: * {@code ListVector.getObject} for a {@code VarCharVector} child returns a {@code JsonStringArrayList} whose elements are Arrow's {@link Text} class, not Java {@link String}. {@code ExprValueUtils.fromObjectValue} doesn't recognize {@code Text} and threw "unsupported object class org.apache.arrow.vector.util.Text". {@code ArrowValues.toJavaValue} now mirrors its top-level VarChar branch for list cells: when a list value comes back from a {@code ListVector}, normalize each {@code Text} element to a {@link String} before handing the list upward. * Before: 12/60 (mvindex range tests still showed expected-vs-actual diff because `[2,3,4]` came back as a JSON string, not an array). * After: 26/60. Newly passing: testMvindexRangePositive, testMvindexRangeNegative, testMvindexRangeMixed, testMvindexRangeFirstThree, testMvindexRangeLastThree, testMvindexRangeSingleElement, testMvdedupWithDuplicates, testMvdedupWithAllDuplicates, testMvdedupWithNoDuplicates, testMvdedupWithStrings, testArrayWithString, testSplitWithSemicolonDelimiter, testSplitWithMultiCharDelimiter, testSplitWithEmptyDelimiter. Signed-off-by: Kai Huang <ahkcs@amazon.com> * [Analytics] Add SHA + LICENSE files for new bundled deps; spotless The `dependencyLicenses` precommit task scans `licenses/` for a `<jar>.sha1` sibling per bundled dependency. Two deps added in this PR were missing them: * `commons-text-1.11.0` in analytics-engine — needs sha1 + LICENSE + NOTICE (no shared `commons-text-*` license files yet in this plugin). Apache 2.0; LICENSE and NOTICE extracted from the released jar. * `jackson-datatype-jsr310-2.21.3` in arrow-flight-rpc — sha1 only. arrow-flight-rpc's `dependencyLicenses` already maps `jackson-.*` to the shared `jackson-LICENSE` / `jackson-NOTICE` files via `mapping from: /jackson-.*/, to: 'jackson'`, so no new license/notice files are needed. Plus googleJavaFormat reflow on `ArraySliceAdapter` and `DataFusionPlugin` that spotlessCheck flagged in precommit. Verified `:plugins:arrow-flight-rpc:precommit`, `:sandbox:plugins:analytics-engine:precommit`, and `:sandbox:plugins:analytics-backend-datafusion:precommit` all succeed. Addresses review feedback on opensearch-project#21554. Signed-off-by: Kai Huang <ahkcs@amazon.com> * [Analytics Engine] Map BigDecimal cells to FloatingPoint in row-codec inference {@code RowResponseCodec.scalarArrowType} ordered its instanceof checks {Long, Integer, Short, Byte, Double, Float, Boolean, CharSequence, byte[], Number(fallback) → Int(64)}. BigDecimal extends {@link Number} but isn't any of the typed scalar arms, so it fell through to the {@code Number} fallback and got encoded as a 64-bit integer column — silently truncating fractional digits. This bites PPL flows whose common element type is {@code DECIMAL} (e.g. {@code array(1, -1.5, 2, 1.0)} — the v2-side {@code ArrayImplementor.internalCast} explicitly maps the DECIMAL target to BigDecimal cells). The element values {@code -1.5} and {@code 1.0} round to {@code -1} and {@code 1} when forced through Int(64), so the array reads back as {@code [1, -1, 2, 1]} instead of {@code [1, -1.5, 2, 1.0]}. Promote BigDecimal cells to FloatingPoint(DOUBLE) — same precision the v2 engine uses for decimal-typed PPL results, so behavior matches across both execution paths. The list writer's {@code Float8Vector} arm already uses {@code ((Number) element).doubleValue()}, which correctly extracts the fractional value from a BigDecimal. Signed-off-by: Kai Huang <ahkcs@amazon.com> * [Analytics Backend / DataFusion] Onboard PPL mvzip via custom Rust UDF PPL `mvzip(left, right [, sep])` element-wise zips two arrays into a list of strings, joined per pair by a separator (default `,`). DataFusion has no stdlib equivalent — `array_concat` is end-to-end concatenation, and Substrait's lambda support is too thin for a transform/zip rewrite — so this onboards a custom Rust ScalarUDF on the analytics-backend-datafusion plugin's session context and wires the Java side to route to it. Templated shape (extends the existing pattern from convert_tz): Rust side: udf::mvzip::MvzipUdf — Signature::user_defined; coerce_types pins the first two args to ListArray and the optional 3rd to Utf8; invoke_with_args iterates per row, takes min(len(left), len(right)) elements, stringifies each (matching `Objects.toString(elem, "")` for null elements), and builds a List<Utf8>. Defensive Null-element-type arm handles the empty array case before the SQL-plugin VARCHAR-default kicks in. Registered on each session context via udf::register_all alongside convert_tz. 7 unit tests cover the basic / custom-sep / truncation / null-element / null-array / empty-array / numeric-array shapes. Java side: ScalarFunction.MVZIP enum entry (SqlKind.OTHER_FUNCTION; resolves through identifier-name valueOf("MVZIP") since PPL's MVZipFunctionImpl registers under the function name "mvzip"). MvzipAdapter — locally-declared SqlFunction("mvzip") + ADDITIONAL_SCALAR_SIGS bridge so isthmus emits a Substrait scalar function call with the exact name the Rust UDF is registered under. DataFusionAnalyticsBackendPlugin: ARRAY_RETURNING_PROJECT_OPS membership (returns ARRAY<VARCHAR>, registered against FieldType.ARRAY); adapter registration in scalarFunctionAdapters(). opensearch_array_functions.yaml: two impls for arity-2 and arity-3. * Before: 28/60. * After: 34/60. Newly passing — all 5 testMvzip* variants: testMvzipBasic, testMvzipWithCustomDelimiter, testMvzipNested, testMvzipWithEmptyArray, testMvzipWithBothEmptyArrays. (Test count delta is +6 because the test class also exercises mvzip in 1 other test under a different name, picked up by the same fix.) This PR's run also picks up the SQL-plugin companion #5421 which defaults empty `array()` to ARRAY<VARCHAR>. Without that companion the testMvzipWith*EmptyArray variants would still fail — substrait would reject the input ARRAY<NULL> type before reaching the UDF. The Rust UDF's Null-element arm exists as a defensive backstop in case the call ever reaches it with a null-typed list. Signed-off-by: Kai Huang <ahkcs@amazon.com> * [Analytics Backend / DataFusion] Onboard PPL mvfind via custom Rust UDF PPL `mvfind(arr, regex)` finds the 0-based index of the first array element matching a regex pattern (Java `Matcher.find` substring-match semantics), or NULL if no match. DataFusion has no stdlib equivalent, and rewriting in terms of array_position requires per-element regex evaluation that's only expressible with substrait lambda support — out of scope here. Onboards a custom Rust ScalarUDF on the analytics-backend-datafusion plugin's session context, mirroring the mvzip/convert_tz pattern. Templated shape: Rust side: udf::mvfind::MvfindUdf — Signature::user_defined; coerce_types pins arg 0 to a list type and arg 1 to Utf8; invoke_with_args walks each row and finds the first non-null element whose stringified form matches the regex via Rust's `regex` crate (`Regex::is_match` is unanchored, same as Java's `Matcher.find`). Scalar pattern operands compile once up front and surface invalid-regex errors at plan time (mirrors the SQL plugin's plan-time `tryCompileLiteralPattern`); column-valued patterns compile per row and yield NULL for invalid patterns. Supports list element types Utf8 / Int{8,16,32,64} / UInt{8,16,32,64} / Float{32,64} / Boolean / Null. 7 unit tests cover the basic-match / no-match / null-array / empty-array / null-element / numeric-array / unanchored shapes. Registered on each session context via udf::register_all alongside convert_tz and mvzip. Java side: ScalarFunction.MVFIND enum entry (SqlKind.OTHER_FUNCTION; resolves through identifier-name valueOf("MVFIND") since PPL's MVFindFunctionImpl registers under the function name "mvfind"). MvfindAdapter — locally-declared SqlFunction("mvfind") + ADDITIONAL_SCALAR_SIGS bridge so isthmus emits a Substrait scalar function call with the exact name the Rust UDF is registered under. DataFusionAnalyticsBackendPlugin: STANDARD_PROJECT_OPS membership (returns INTEGER, registered against the existing scalar SUPPORTED_FIELD_TYPES); adapter registration in scalarFunctionAdapters(). opensearch_array_functions.yaml: arity-2 impl returning `i32?`. * Before: 34/60. * After: 42/60. Newly passing — 8 of 9 testMvfind* variants: testMvfindWithMatch, testMvfindWithFirstMatch, testMvfindWithMultipleMatches, testMvfindWithNoMatch, testMvfindWithEmptyArray, testMvfindWithNumericArray, testMvfindWithCaseInsensitive, testMvfindWithComplexRegex. Remaining mvfind failure: testMvfindWithDynamicRegex — fails with "Unable to convert call CONCAT(string, string)" because the test computes the pattern via `concat('ban', '.*')` and substrait can't bind the CONCAT call. This is a separate analytics-engine CONCAT type-conversion issue, not mvfind-specific. Signed-off-by: Kai Huang <ahkcs@amazon.com> * [Analytics Backend / DataFusion] Onboard PPL mvappend via custom Rust UDF PPL `mvappend(arg1, arg2, …)` flattens a mixed list of array and scalar arguments into one array, dropping null arguments and null elements within array arguments. DataFusion's `array_concat` is the closest stdlib match but only accepts arrays (not mixed array+scalar) and preserves nulls — different semantics. Onboards as a custom Rust ScalarUDF on the analytics-backend-datafusion plugin's session context, mirroring the mvzip / mvfind pattern. Templated shape: Rust side: udf::mvappend::MvappendUdf — Signature::user_defined; per-row walk over operands, skipping NULL args and NULL elements inside array args, with explicit Arrow type arms for {Int8/16/32/64, UInt8/16/32/64, Float32/64, Boolean, Utf8/LargeUtf8/Utf8View}. The string arms output List<Utf8> or List<Utf8View> depending on the inferred element type so the result schema matches what `return_type` declared (DataFusion's execution-time schema check rejects mismatches). Defensive Null element-type arm covers the empty-array shape. 6 unit tests. Registered on each session context via udf::register_all. Java side: ScalarFunction.MVAPPEND enum entry (SqlKind.OTHER_FUNCTION; resolves through identifier-name valueOf("MVAPPEND")). MvappendAdapter — locally-declared SqlFunction("mvappend") + ADDITIONAL_SCALAR_SIGS bridge. Casts every scalar operand to the call's array component type and every array operand to ARRAY<componentType> before substrait emission, so the UDF sees a single uniform element type across all positions. DataFusionAnalyticsBackendPlugin: ARRAY_RETURNING_PROJECT_OPS membership (returns ARRAY<commonType>); adapter registration in scalarFunctionAdapters(). opensearch_array_functions.yaml: variadic min:1 entry with `list<any1?>` return type. * Before: 0/15. * After: 6/15. Newly passing: testMvappendWithMultipleElements, testMvappendWithSingleElement, testMvappendWithArrayFlattening, testMvappendWithStringValues, testMvappendWithNestedArrays, testMvappendWithRealFields. * 8 tests fail with "Unable to convert the type ANY". Root cause is PPL's MVAppendFunctionImpl.updateMostGeneralType using strict Object.equals on each pair of operand types, returning Calcite's ANY type when any two don't match — including when they only differ in nullability tag (a literal 3 is INTEGER NOT NULL but the component type of `array(1, 2)` is INTEGER NULLABLE). Substrait can't serialize ANY. The fix belongs in the SQL plugin's MVAppendFunctionImpl (use typeFactory.leastRestrictive instead of Object.equals) and isn't addressed here. * testMvappendInWhereClause — uses `where array_length(combined) = 2` which the analytics-engine planner rejects with "No backend can evaluate filter predicate [EQUALS] on fields [combined:ARRAY]". Filter-side capability gap unrelated to mvappend. * testMvappendWithComplexExpression — fails substrait conversion on a nested mvappend call ("Unable to convert call mvappend(list, …)"), likely the same nullability widening pattern flowing through nested calls. Same upstream fix applies. Unchanged at 43/60 — mvappend isn't exercised there. Signed-off-by: Kai Huang <ahkcs@amazon.com> * [Analytics Backend / DataFusion] Reshape mvappend operands as uniform lists; add Decimal128 element support Two follow-ons to the initial mvappend onboarding (40b2161), both surfaced once the SQL companion opensearch-project#5424 (`MVAppendFunctionImpl.leastRestrictive`) let homogeneous-type calls reach substrait conversion. # Uniform-list operand reshape Substrait's variadic-`any1` argument shape requires every operand at the same variadic position to share a type. PPL's `mvappend(arg, …)` accepts a mix of bare scalars and arrays, which substrait's signature matcher rejected with `Unable to convert call mvappend(list<i32?>, i32?, i32?)`. `MvappendAdapter` now wraps each scalar operand in a singleton `make_array(scalar)` call (using the locally-declared `MakeArrayAdapter.LOCAL_MAKE_ARRAY_OP`) so by the time the substrait converter sees the operands they're uniformly `list<componentType>`. The yaml impl was correspondingly tightened from `args: [{ value: any1 }] variadic` to `args: [{ value: list<any1?> }] variadic`. Rust UDF (`udf::mvappend`) keeps its scalar-handling branch intact as a defensive fallback, but in practice every operand it sees is a list now. # Decimal128 element type Calcite's leastRestrictive widening on INT + DECIMAL produces DECIMAL(p, s) which substrait converts to Decimal128(p, s); the Java adapter casts every operand's element type to that. The Rust UDF needed an explicit `DataType::Decimal128(p, s)` branch — Decimal128Builder requires `.with_precision_and_scale(p, s)` configuration before use, and Decimal128Array elements are read via the `i128`-valued `value(i)` accessor (not via the generic `build!` macro). # Pass-rate (CalciteMVAppendFunctionIT, force-routed, with companion opensearch-project#5424 applied) * Before this commit: 6/15 (initial mvappend onboarding). * After this commit: 10/15. Newly passing: testMvappendWithMixedArrayAndScalar (uniform-list reshape), testMvappendWithComplexExpression (uniform-list reshape), testMvappendWithIntAndDouble (Decimal128 element), testMvappendWithNumericArrays (Decimal128 element). Remaining 5 failures: * testMvappendWithMixedTypes / WithFieldsAndLiterals / WithEmptyArray / WithNull — call legitimately widens to ARRAY<ANY> because operands contain pairs of types with no common widened type (INT + VARCHAR). The Calcite engine handles ANY via Object generic dispatch; substrait can't encode it. Out of scope without changing PPL UDF semantics. * testMvappendInWhereClause — uses `where array_length(combined) = 2` which the analytics-engine planner rejects with "No backend can evaluate filter predicate [EQUALS] on fields [combined:ARRAY]". Filter-side capability gap unrelated to mvappend. Signed-off-by: Kai Huang <ahkcs@amazon.com> * [Analytics Backend / DataFusion] Register UDFs on FFM-created session contexts create_session_context (the Rust-side builder behind df_create_session_context) built a fresh DataFusion SessionContext but never called udf::register_all on it. Every fragment query routed through df_execute_with_context reused that handle's ctx via query_executor::execute_with_context, so substrait function references to mvappend / mvfind / mvzip / convert_tz failed planning with "This feature is not implemented: Unsupported function name". The matching register_all call exists in execute_query / local_executor / indexed_executor — this just brings the FFM session-context path to parity. Verified: CalciteMVAppendFunctionIT against the analytics-engine route now passes 10/15 (was 0/15) with the SQL companion opensearch-project#5424 widening fix applied. The remaining 5 are pre-existing ARRAY<ANY>/UNKNOWN substrait-encoding gaps (heterogeneous mvappend signatures, empty-array default, filter-on-array predicate) tracked in this PR's "What's left" section. Signed-off-by: Kai Huang <ahkcs@amazon.com> * [Analytics Backend / DataFusion] Don't let substrait AssertionError kill the cluster Substrait's plan validators (VariadicParameterConsistencyValidator, RelOptUtil.eq via Litmus.THROW, etc.) throw AssertionError directly via explicit `throw new AssertionError(...)` rather than via the `assert` keyword, so the JVM -da flag doesn't gate them. When a malformed plan triggers one inside a search-thread call to SubstraitRelVisitor.apply, the AssertionError propagates uncaught up the analytics-engine fragment handler stack, OpenSearchUncaughtExceptionHandler classifies it as fatal, and the entire cluster JVM exits. Wrap the visitor.apply call in a narrow try/catch that re-raises the AssertionError as IllegalStateException with the original message and cause preserved. The analytics-engine error path already buckets IllegalStateException at the fragment boundary into a normal HTTP 500 response — the cluster stays up and the failure shows in the per-query report instead. This came up while diagnosing CalciteMVAppendFunctionIT failures: malformed ARRAY<ANY> plans were taking down the cluster mid-test instead of producing per-test failures, masking the underlying substrait conversion error. Signed-off-by: Kai Huang <ahkcs@amazon.com> * [QA] Add ArrayFunctionIT + MVAppendFunctionIT for analytics-engine REST path Self-contained QA ITs in sandbox/qa/analytics-engine-rest exercising the PPL collection functions onboarded in this PR through POST /_analytics/ppl against a parquet-backed `calcs` dataset, no SQL plugin checkout required. ArrayFunctionIT (22 tests): - array constructor (mixed-numeric BigDecimal → Double promotion + int+string) - array_length - mvindex range (array_slice — 0-based-(start, length) → 1-based-(start, end)) - mvindex single (array_element via ITEM rename) - mvdedup (array_distinct) - mvjoin (array_to_string rename) - mvzip (Rust UDF, default + custom delimiter + nested) - mvfind (Rust UDF, match / no-match / dynamic regex via concat() Sig bridge) - split (returns array) MVAppendFunctionIT (6 tests): - uniform-typed scalar variadic (multiple, single, string) - array operands (flattening, nested string arrays) - VARCHAR field references via real calcs row Tests gated on SQL companion opensearch-project#5424 (testMvappendWith{IntAndDouble, MixedArrayAndScalar, NumericArrays, ComplexExpression}) are intentionally absent — they fail with "Unable to convert the type ANY" until MVAppendFunctionImpl's leastRestrictive widening + DECIMAL→DOUBLE promotion + operand pre-cast is published as unified-query-core. A top-of-class block lists them with a pointer back to opensearch-project#5424. Lambda-based functions (transform, mvmap, reduce, forall, exists, filter) and empty-array operands are absent for the architectural reasons in this PR's "What's left" section: substrait extension YAML doesn't support declaring func<…> lambda-typed args, and array() defaults to ARRAY<UNKNOWN> which substrait can't encode without #5421. Local verification (per `docs/dev/ppl-analytics-engine-routing.md` SOP): - :sandbox:qa:analytics-engine-rest:integTest --tests "*ArrayFunctionIT" — 22/22 - :sandbox:qa:analytics-engine-rest:integTest --tests "*MVAppendFunctionIT" — 6/6 - :check -p sandbox — all 718 tasks green Signed-off-by: Kai Huang <ahkcs@amazon.com> --------- Signed-off-by: Kai Huang <ahkcs@amazon.com>
1 parent f97a81c commit 4dea6c4

29 files changed

Lines changed: 2866 additions & 33 deletions

File tree

plugins/arrow-flight-rpc/build.gradle

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@ dependencies {
3636
api "com.fasterxml.jackson.core:jackson-core:${versions.jackson}"
3737
api "com.fasterxml.jackson.core:jackson-databind:${versions.jackson}"
3838
api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson_annotations}"
39+
// arrow-vector's JsonStringArrayList static-initializes a Jackson ObjectMapper that registers
40+
// JavaTimeModule. Without jsr310 on arrow-flight-rpc's classpath, any reader of an Arrow
41+
// ListVector (e.g. DataFusion's array-returning UDFs flowing through analytics-engine) hits
42+
// a fatal NoClassDefFoundError that exits the JVM.
43+
api "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${versions.jackson}"
3944
api "commons-codec:commons-codec:${versions.commonscodec}"
4045

4146
// arrow flight dependencies.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
a0958ebdaba836d31e5462ebc37b6349a0725ff9

sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/FieldType.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,16 @@ public enum FieldType {
5555
NESTED("nested"),
5656
OBJECT("object"),
5757
FLAT_OBJECT("flat_object"),
58-
COMPLETION("completion");
58+
COMPLETION("completion"),
59+
/**
60+
* Array-typed expression result. Used for the return-type slot of array-producing scalar
61+
* functions (PPL {@code array(…)}, {@code array_slice}, {@code array_distinct}). Has no
62+
* OpenSearch mapping equivalent — arrays in OpenSearch are multi-value fields with the
63+
* underlying element type, not a separate type. The mapping string is {@code "array"} as a
64+
* placeholder; {@link #fromMappingType} keeps working unchanged because no source
65+
* advertises that mapping string.
66+
*/
67+
ARRAY("array");
5968

6069
private final String mappingType;
6170

@@ -117,6 +126,7 @@ public static FieldType fromSqlTypeName(SqlTypeName sqlTypeName) {
117126
case TIME, TIMESTAMP, TIMESTAMP_WITH_LOCAL_TIME_ZONE -> FieldType.DATE;
118127
case BOOLEAN -> FieldType.BOOLEAN;
119128
case BINARY, VARBINARY -> FieldType.BINARY;
129+
case ARRAY -> FieldType.ARRAY;
120130
default -> null;
121131
};
122132
}

sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/ScalarFunction.java

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,60 @@ public enum ScalarFunction {
177177
JSON_EXTEND(Category.SCALAR, SqlKind.OTHER_FUNCTION),
178178
JSON_EXTRACT(Category.SCALAR, SqlKind.OTHER_FUNCTION),
179179
JSON_KEYS(Category.SCALAR, SqlKind.OTHER_FUNCTION),
180-
JSON_SET(Category.SCALAR, SqlKind.OTHER_FUNCTION);
180+
JSON_SET(Category.SCALAR, SqlKind.OTHER_FUNCTION),
181+
182+
// ── Array ────────────────────────────────────────────────────────
183+
/**
184+
* PPL {@code array(a, b, …)} constructor — resolves through the SQL plugin's
185+
* {@code ArrayFunctionImpl} UDF named {@code "array"}. DataFusion's native
186+
* equivalent is {@code make_array}, so a backend that supports this needs a
187+
* name-mapping adapter (see {@code MakeArrayAdapter} in the DataFusion backend).
188+
*/
189+
ARRAY(Category.SCALAR, SqlKind.OTHER_FUNCTION),
190+
ARRAY_LENGTH(Category.SCALAR, SqlKind.OTHER_FUNCTION),
191+
ARRAY_SLICE(Category.SCALAR, SqlKind.OTHER_FUNCTION),
192+
ARRAY_DISTINCT(Category.SCALAR, SqlKind.OTHER_FUNCTION),
193+
/**
194+
* Calcite's {@code ARRAY_JOIN} — joins array elements with a separator. PPL
195+
* {@code mvjoin} is registered to this operator. DataFusion's native equivalent
196+
* is named {@code array_to_string}, so the DataFusion backend rewrites to that
197+
* via a name-mapping adapter.
198+
*/
199+
ARRAY_JOIN(Category.SCALAR, SqlKind.OTHER_FUNCTION),
200+
/**
201+
* Calcite's {@code SqlStdOperatorTable.ITEM} — element access ({@code arr[N]}).
202+
* PPL's {@code mvindex(arr, N)} single-element form lowers through
203+
* {@code MVIndexFunctionImp.resolveSingleElement} to ITEM with a 1-based index
204+
* (already converted from PPL's 0-based input). DataFusion's native equivalent
205+
* is {@code array_element}, also 1-based; the DataFusion backend renames via a
206+
* name-mapping adapter.
207+
*/
208+
ITEM(Category.SCALAR, SqlKind.ITEM),
209+
/**
210+
* PPL {@code mvzip(left, right [, sep])} — element-wise zip of two arrays into an
211+
* array of strings, joined per pair by a separator (default {@code ","}). Resolves
212+
* through the SQL plugin's {@code MVZipFunctionImpl} UDF named {@code "mvzip"}.
213+
* No DataFusion stdlib equivalent — the analytics-backend-datafusion plugin ships
214+
* a custom Rust UDF (`udf::mvzip`) registered on its session context.
215+
*/
216+
MVZIP(Category.SCALAR, SqlKind.OTHER_FUNCTION),
217+
/**
218+
* PPL {@code mvfind(arr, regex)} — find the 0-based index of the first array
219+
* element matching a regex, or NULL if no match. Resolves through the SQL
220+
* plugin's {@code MVFindFunctionImpl} UDF named {@code "mvfind"}. No
221+
* DataFusion stdlib equivalent — the analytics-backend-datafusion plugin
222+
* ships a custom Rust UDF (`udf::mvfind`) registered on its session context.
223+
*/
224+
MVFIND(Category.SCALAR, SqlKind.OTHER_FUNCTION),
225+
/**
226+
* PPL {@code mvappend(arg1, arg2, …)} — flatten a mixed list of array and
227+
* scalar arguments into one array, dropping null args and null elements.
228+
* Resolves through the SQL plugin's {@code MVAppendFunctionImpl} UDF named
229+
* {@code "mvappend"}. DataFusion's {@code array_concat} only accepts arrays
230+
* and preserves nulls, so the analytics-backend-datafusion plugin ships a
231+
* custom Rust UDF ({@code udf::mvappend}) registered on its session context.
232+
*/
233+
MVAPPEND(Category.SCALAR, SqlKind.OTHER_FUNCTION);
181234

182235
/**
183236
* Category of scalar function.

sandbox/plugins/analytics-backend-datafusion/build.gradle

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ dependencies {
7373
implementation "io.substrait:isthmus:0.89.1"
7474
implementation "io.substrait:core:0.89.1"
7575
implementation "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:${versions.jackson}"
76+
// jackson-datatype-jsr310 — added to arrow-flight-rpc (the parent plugin that bundles
77+
// arrow-vector). arrow-vector's JsonStringArrayList eagerly registers JavaTimeModule on
78+
// its ObjectMapper, so jsr310 must be visible to arrow-vector's defining classloader,
79+
// not this plugin's. compileOnly here would also work; runtime is provided by parent.
80+
compileOnly "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${versions.jackson}"
7681

7782
calciteCompile "com.google.guava:guava:${versions.guava}"
7883
calciteTestCompile "com.google.guava:guava:${versions.guava}"

sandbox/plugins/analytics-backend-datafusion/rust/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ serde_json = { workspace = true, features = ["preserve_order"] }
6868
# multi-path JSON-array output. Moving to 1.x is a follow-up once we can
6969
# reproduce that distinction against the new API surface.
7070
jsonpath-rust = "0.7"
71+
# mvfind UDF — regex matching against stringified array elements
72+
regex = "1.10"
7173

7274
[dev-dependencies]
7375
criterion = { workspace = true }

sandbox/plugins/analytics-backend-datafusion/rust/src/session_context.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,10 @@ pub async unsafe fn create_session_context(
111111
.build();
112112

113113
let ctx = SessionContext::new_with_state(state);
114+
// Register OpenSearch UDFs (mvappend, mvfind, mvzip, convert_tz, …) on this session
115+
// so the substrait converter at execute_with_context can resolve their function names.
116+
// Without this, fragment execution fails with "Unsupported function name" because
117+
// df_execute_with_context reuses this handle's ctx instead of building a fresh one.
114118
crate::udf::register_all(&ctx);
115119

116120
// Register default ListingTable for parquet scans

sandbox/plugins/analytics-backend-datafusion/rust/src/udf/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,9 @@ pub mod json_extend;
122122
pub mod json_extract;
123123
pub mod json_keys;
124124
pub mod json_set;
125+
pub mod mvappend;
126+
pub mod mvfind;
127+
pub mod mvzip;
125128
pub mod tonumber;
126129
pub mod tostring;
127130

@@ -141,10 +144,13 @@ pub fn register_all(ctx: &SessionContext) {
141144
json_extract::register_all(ctx);
142145
json_keys::register_all(ctx);
143146
json_set::register_all(ctx);
147+
mvzip::register_all(ctx);
148+
mvfind::register_all(ctx);
149+
mvappend::register_all(ctx);
144150
tonumber::register_all(ctx);
145151
tostring::register_all(ctx);
146152
log::info!(
147-
"OpenSearch UDF register_all: convert_tz, json_append, json_array_length, json_delete, json_extend, json_extract, json_keys, json_set, tonumber, tostring registered"
153+
"OpenSearch UDF register_all: convert_tz, json_append, json_array_length, json_delete, json_extend, json_extract, json_keys, json_set, mvzip, mvfind, mvappend, tonumber, tostring registered"
148154
);
149155
}
150156

0 commit comments

Comments
 (0)