Skip to content

Commit 2c8bf2b

Browse files
committed
[analytics-engine] Wire PPL spath end-to-end through the analytics-engine route
Closes the analytics-engine gap for the PPL `spath` command. The path-mode variant (`spath path=...`) already worked via the existing `json_extract` wiring; this PR adds the auto-extract mode (`spath input=doc` → `JSON_EXTRACT_ALL` returning `MAP<VARCHAR, VARCHAR>`) and its downstream operators (ITEM lookup, WHERE on extracted values). | IT | Before | After | |---|---|---| | `sql/integ-test/.../CalcitePPLSpathCommandIT` (`-Dtests.analytics.force_routing=true -Dtests.analytics.parquet_indices=true`) | 0 / 16 | 16 / 16 | | `sql/integ-test/.../CalcitePPLSpathCommandIT` (default v2/Calcite route) | 16 / 16 | 16 / 16 (no regression) | | `sandbox/qa/analytics-engine-rest/.../SpathCommandIT` (new) | n/a | 16 / 16 | Baseline failure modes on the analytics-engine route: - 15 tests: `OpenSearchProjectRule.annotateExpr` → `No backend supports scalar function [JSON_EXTRACT_ALL] among [datafusion]`. - 1 test (`testSimpleSpath`): `EngineBackedIndexer.acquireReader` → `UnsupportedOperationException` (test-infra issue, fixed on the SQL plugin side in a paired PR). 1. **`json_extract_all` Rust UDF** (`sandbox/plugins/analytics-backend-datafusion/rust/src/udf/json_extract_all.rs`). ~550 lines + 16 unit tests. Returns Arrow `Map<Utf8, Utf8>`; mirrors `JsonExtractAllFunctionImpl`'s legacy contract (dot-path flatten, `{}` array marker, `[a, b, c]` merge format for duplicate keys / arrays, `"null"` literal for JSON nulls, malformed → empty map, top-level scalar → NULL). 2. **SPI enum additions** in `analytics-framework`: - `ScalarFunction.JSON_EXTRACT_ALL` enum constant. - `FieldType.MAP` enum constant + `case MAP -> FieldType.MAP` in `fromSqlTypeName`. 3. **Capability registrations** in `DataFusionAnalyticsBackendPlugin`: - New `MAP_RETURNING_PROJECT_OPS` set (mirrors `ARRAY_RETURNING_PROJECT_OPS`) registered with `FieldType.MAP`. Required because `OpenSearchProjectRule.resolveScalarViableBackends` keys on the call's return type, and JSON_EXTRACT_ALL's `MAP<VARCHAR, VARCHAR>` return wouldn't match `SUPPORTED_FIELD_TYPES`. - `STANDARD_FILTER_OPS` registered against `FieldType.MAP` so `where doc.user.name = 'John'` (which references the underlying MAP column through ITEM) survives the filter-rule's field-index-keyed viability check. - Adapter binding `ScalarFunction.JSON_EXTRACT_ALL → JsonExtractAllAdapter`. 4. **Substrait wiring**: - `opensearch_scalar_functions.yaml` — entries for `json_extract_all` and `map_extract`. - `DataFusionFragmentConvertor.ADDITIONAL_SCALAR_SIGS` — function mappings for both names. - `JsonFunctionAdapters.JsonExtractAllAdapter` — name-mapping adapter. 5. **`ITEM(Map, key)` dispatch** in `ArrayElementAdapter`. PPL's `result.user.name` lowers to `ITEM(JSON_EXTRACT_ALL(doc), 'user.name')`. Two transforms for the MAP-input branch: - Route to `map_extract` (DataFusion's native map accessor) instead of `array_element`. Since `map_extract` returns `List<value>` (maps permit duplicate keys), wrap the call in `array_element(..., 1)` to project the singleton list back to a scalar. - Coerce the lookup key (CHAR(N) literal) to VARCHAR before emission so it unifies with the substrait `any1` type-variable binding the YAML declares. 6. **`ArrowValues.MapVector` flattening** in `analytics-engine`. Arrow `MapVector` is laid out as `List<Struct{key, value}>`, so `MapVector.getObject(i)` returns a `JsonStringArrayList` of entry structs rather than a proper map. Reassemble into a `LinkedHashMap<String, Object>` (Text→String normalization on keys and values) so the SQL-plugin response marshaller sees the same shape as a legacy v2 `Map<String, Object>` column. 7. **`sandbox/plugins/analytics-engine/build.gradle`** — when `analytics-engine` is in the root `:run` task's `installedPlugins`, attach `opensearch.experimental.feature.transport.stream.enabled=true` to the `runTask` testCluster. Otherwise the analytics-engine + SQL-plugin co-install fails at boot with a duplicate-PPL-transport-handler Guice error. Keeping the override here (rather than in the central `gradle/run.gradle`) means regular runs that don't include analytics-engine never see the flag. 8. **QA-side `SpathCommandIT`** under `sandbox/qa/analytics-engine-rest/...`. Mirrors `CalcitePPLSpathCommandIT` one test method to one, sends queries via `POST /_analytics/ppl`, no SQL-plugin dependency. Verifies the full spath surface end-to-end (both modes, ITEM-on-MAP eval / where / stats / sort, edge cases). Four small datasets under `resources/datasets/spath_{simple,auto,cmd,null}/`. Every piece in this PR is reusable beyond `spath`: - The MAP_RETURNING_PROJECT_OPS pattern + MAP filter capability are generic for any future PPL function emitting a Calcite MAP RelDataType. - `ArrayElementAdapter`'s ITEM-on-MAP branch + the `map_extract` YAML entry handle every `result['key']` / `result.field` access on a map column, not just spath's. - `ArrowValues.MapVector` flattening unblocks any UDF returning `Map<Utf8, Utf8>` from the analytics-engine route. The SQL plugin side has a test-infrastructure change to ensure the v2 / Calcite IT's test indices get parquet-backed for the analytics-engine compatibility run: opensearch-project/sql#5441. ```bash JAVA_HOME=/path/to/temurin-25 ./gradlew :run -Dsandbox.enabled=true \ -PinstalledPlugins="['opensearch-job-scheduler:3.7.0.0-SNAPSHOT', \ 'arrow-flight-rpc', 'analytics-engine', 'parquet-data-format', \ 'analytics-backend-datafusion', 'analytics-backend-lucene', \ 'composite-engine', 'opensearch-sql-plugin:3.7.0.0-SNAPSHOT']" ./gradlew :sandbox:qa:analytics-engine-rest:integTest \ -Dsandbox.enabled=true --tests "*SpathCommandIT" ./gradlew :integ-test:integTestRemote \ -Dtests.rest.cluster=localhost:9200 \ -Dtests.cluster=localhost:9300 \ -Dtests.clustername=runTask \ -Dtests.analytics.force_routing=true \ -Dtests.analytics.parquet_indices=true \ --tests "org.opensearch.sql.calcite.remote.CalcitePPLSpathCommandIT" ``` Signed-off-by: Kai Huang <ahkcs@amazon.com>
1 parent 2d3f497 commit 2c8bf2b

20 files changed

Lines changed: 1225 additions & 12 deletions

File tree

sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/FieldType.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,20 @@ public enum FieldType {
6464
* placeholder; {@link #fromMappingType} keeps working unchanged because no source
6565
* advertises that mapping string.
6666
*/
67-
ARRAY("array");
67+
ARRAY("array"),
68+
69+
/**
70+
* Map-typed expression result. First in-tree producer is PPL `spath`'s auto-extract mode
71+
* (`JSON_EXTRACT_ALL` returns {@code MAP<VARCHAR, VARCHAR>}). Mapping string is {@code
72+
* "map"} as a placeholder — no OpenSearch storage format declares this mapping today, so
73+
* {@link #fromMappingType} never resolves to it through the mapping path; columns reach
74+
* MAP only through {@link #fromSqlTypeName}. Capability registrations for filter / project
75+
* operators on MAP columns are intentionally minimal: callers (e.g. PPL `where doc.user.name`)
76+
* always wrap the MAP column in an ITEM lookup whose result type is the map's value type,
77+
* so the EQUALS / sort / aggregate operators see the value-level type by the time the
78+
* runtime executes them.
79+
*/
80+
MAP("map");
6881

6982
private final String mappingType;
7083

@@ -127,6 +140,7 @@ public static FieldType fromSqlTypeName(SqlTypeName sqlTypeName) {
127140
case BOOLEAN -> FieldType.BOOLEAN;
128141
case BINARY, VARBINARY -> FieldType.BINARY;
129142
case ARRAY -> FieldType.ARRAY;
143+
case MAP -> FieldType.MAP;
130144
default -> null;
131145
};
132146
}

sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/ScalarFunction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ public enum ScalarFunction {
233233
JSON_DELETE(Category.SCALAR, SqlKind.OTHER_FUNCTION),
234234
JSON_EXTEND(Category.SCALAR, SqlKind.OTHER_FUNCTION),
235235
JSON_EXTRACT(Category.SCALAR, SqlKind.OTHER_FUNCTION),
236+
JSON_EXTRACT_ALL(Category.SCALAR, SqlKind.OTHER_FUNCTION),
236237
JSON_KEYS(Category.SCALAR, SqlKind.OTHER_FUNCTION),
237238
JSON_SET(Category.SCALAR, SqlKind.OTHER_FUNCTION),
238239

0 commit comments

Comments
 (0)