Skip to content

Commit 5dd98b5

Browse files
committed
Enable PPL parse command on the analytics-engine route
Wire PPL `parse <field> '<regex>'` through PPL β†’ Calcite β†’ Substrait β†’ DataFusion. The command lowers to one `ITEM(PARSE(input, regex, "regex"), '<group>')` per named group; PARSE returns `map<utf8, utf8>` of all named groups, ITEM extracts each value. Both UDFs sit on the Rust side of the analytics backend. Highlights: * New Rust UDFs `parse` and `item` (`sandbox/plugins/analytics-backend-datafusion/ rust/src/udf/{parse,item}.rs`). `parse` anchors the user pattern with `^(?:…)$` to honour Java's `Matcher.matches()` semantic that legacy `RegexExpression` relies on, so a row that doesn't match consumes nothing and every named group yields `""` β€” same observable behaviour as the legacy path. * `ParseAdapter` validates the pattern + method operands as non-null string literals at plan time and gates the method to `regex` (rejects `grok` / `patterns` with a clear error pointing users at the legacy engine until those methods land). * `FieldType.MAP`, `ScalarFunction.PARSE`, `ScalarFunction.ITEM`, `STANDARD_PROJECT_OPS` (ITEM is added; PARSE is registered separately for `FieldType.MAP` because no real OS mapping is a map and we don't want every scalar registering against the MAP bucket), `FunctionMappings.s` entries for `parse` and `item`, and YAML extension declarations. * Codec MapVector handling at three sites (`ArrowValues`, `DatafusionResultStream`, `RowResponseCodec`) β€” `MapVector.getObject()` builds a `JsonStringHashMap` whose `<clinit>` references jackson-datatype-jsr310 not on the arrow-flight-rpc parent plugin's classloader, so each site reads the offset buffer + key/value sub-vectors directly. * `session_context::create_session_context` now calls `udf::register_all`. The `executeWithContextAsync` fragment path was the only SessionContext creator that wasn't registering OpenSearch UDFs, so any analytics query through that path (the production fragment route) failed with "Unsupported function name". Pre-existing UDFs (`convert_tz`, `to_unixtime`) shared this gap silently because no IT exercised them through the same path. `grok` and `patterns` parse methods are deliberately left on the legacy engine. The Rust UDF rejects them with an explicit message; future onboardings will be deliberate flips rather than silent semantics changes. Verified end-to-end via `CalciteParseCommandIT` under `tests.analytics.force_routing=true`: 7/7 passing (was 4/7 before β€” only the testParseError* set passed, which throws at AST builder time before reaching the analytics planner). The +3 delta covers `testParseCommand`, `testParseCommandReplaceOriginalField`, and `testParseCommandWithOtherRunTimeFields`. Sandbox QA `ParseCommandIT` (8/8) covers the same code paths against the analytics path directly without depending on the SQL plugin worktree. Signed-off-by: Jialiang Liang <jiallian@amazon.com>
1 parent 6aa070b commit 5dd98b5

15 files changed

Lines changed: 1432 additions & 8 deletions

File tree

β€Žsandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/FieldType.javaβ€Ž

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ public enum FieldType {
5656
OBJECT("object"),
5757
FLAT_OBJECT("flat_object"),
5858
COMPLETION("completion"),
59+
60+
// ── Composite ────────────────────────────────────────────────────
5961
/**
6062
* Array-typed expression result. Used for the return-type slot of array-producing scalar
6163
* functions (PPL {@code array(…)}, {@code array_slice}, {@code array_distinct}). Has no
@@ -64,7 +66,14 @@ public enum FieldType {
6466
* placeholder; {@link #fromMappingType} keeps working unchanged because no source
6567
* advertises that mapping string.
6668
*/
67-
ARRAY("array");
69+
ARRAY("array"),
70+
/**
71+
* Models Calcite MAP return types for scalar functions such as PPL {@code parse}
72+
* (parse β†’ {@code map<varchar, varchar>} of named groups). No corresponding OpenSearch
73+
* mapping type; {@link #fromMappingType} won't match {@code "map"} because no real OS
74+
* mapping uses that name.
75+
*/
76+
MAP("map");
6877

6978
private final String mappingType;
7079

@@ -127,6 +136,7 @@ public static FieldType fromSqlTypeName(SqlTypeName sqlTypeName) {
127136
case BOOLEAN -> FieldType.BOOLEAN;
128137
case BINARY, VARBINARY -> FieldType.BINARY;
129138
case ARRAY -> FieldType.ARRAY;
139+
case MAP -> FieldType.MAP;
130140
default -> null;
131141
};
132142
}

β€Žsandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/ScalarFunction.javaβ€Ž

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,15 @@ public enum ScalarFunction {
8989
NUMBER_TO_STRING(Category.STRING, SqlKind.OTHER_FUNCTION), // Alias for TOSTRING
9090
TONUMBER(Category.STRING, SqlKind.OTHER_FUNCTION),
9191
STRCMP(Category.STRING, SqlKind.OTHER_FUNCTION),
92+
/**
93+
* PPL {@code parse <field> '<regex>'} β€” extracts named regex groups into a
94+
* {@code MAP<VARCHAR, VARCHAR>}. Resolves by identifier-name through
95+
* {@link #fromSqlFunction(SqlFunction)} ({@code SqlKind.OTHER_FUNCTION}
96+
* shared with many scalar UDFs). Pairs with {@link #ITEM} downstream:
97+
* {@code parse} returns the map, {@code item(map, group)} extracts each
98+
* named group at the call site.
99+
*/
100+
PARSE(Category.STRING, SqlKind.OTHER_FUNCTION),
92101

93102
// ── Cryptographic hash ─────────────────────────────────────────────
94103
// md5(x), sha1(x), sha2(x, bitLen) with bitLen ∈ {224,256,384,512}, crc32(x).

β€Žsandbox/plugins/analytics-backend-datafusion/rust/Cargo.tomlβ€Ž

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ serde_json = { workspace = true, features = ["preserve_order"] }
6868
# multi-path JSON-array output. Moving to 1.x is a follow-up once we can
6969
# reproduce that distinction against the new API surface.
7070
jsonpath-rust = "=0.7.5"
71-
# mvfind UDF β€” regex matching against stringified array elements
71+
# mvfind / parse UDFs β€” regex matching
7272
regex = "=1.12.3"
7373

7474
# Cryptographic hash UDFs. `sha1` is a dedicated crate (RFC 3174); DataFusion

β€Žsandbox/plugins/analytics-backend-datafusion/rust/src/session_context.rsβ€Ž

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,10 +116,11 @@ pub async unsafe fn create_session_context(
116116
.build();
117117

118118
let ctx = SessionContext::new_with_state(state);
119-
// Register OpenSearch UDFs (mvappend, mvfind, mvzip, convert_tz, …) on this session
120-
// so the substrait converter at execute_with_context can resolve their function names.
121-
// Without this, fragment execution fails with "Unsupported function name" because
122-
// df_execute_with_context reuses this handle's ctx instead of building a fresh one.
119+
// Register OpenSearch UDFs (parse, item, mvappend, mvfind, mvzip, convert_tz, …)
120+
// on this session so the substrait converter at execute_with_context can resolve
121+
// their function names. Without this, fragment execution fails with "Unsupported
122+
// function name" because df_execute_with_context reuses this handle's ctx instead
123+
// of building a fresh one.
123124
crate::udf::register_all(&ctx);
124125

125126
// Register default ListingTable for parquet scans

0 commit comments

Comments
Β (0)