Skip to content

Commit 9f6aef8

Browse files
committed
spath: Arrow Map response marshalling + parquet-backed test indices
Two complementary changes for closing PPL `spath` parity on the analytics-engine route. Pairs with opensearch-project/OpenSearch#21664 (the analytics-engine `json_extract_all` UDF + ITEM-on-MAP dispatch + MAP capability registrations). Both PRs together take `CalcitePPLSpathCommandIT` on the analytics-engine route from 0 / 16 to 16 / 16 without regressing the v2 / Calcite path (still 16 / 16). ## Pass rate | IT | Route | Before | After | |---|---|---|---| | `CalcitePPLSpathCommandIT` | analytics-engine (`-Dtests.analytics.force_routing=true -Dtests.analytics.parquet_indices=true`) | 0 / 16 | **16 / 16** | | `CalcitePPLSpathCommandIT` | default v2 / Calcite (no flags) | 16 / 16 | 16 / 16 (no regression) | The analytics-route number depends on this PR + #21664 landing together; neither PR alone moves it off 0 / 16. ## Changes 1. **`core/.../ExprValueUtils.java`** — `fromObjectValue` gains a FQN-keyed branch for `org.apache.arrow.vector.util.Text` (decoded via `toString()`). Arrow's MapVector / StructVector emit values as `Text` (a UTF-8 byte-buffer wrapper that does NOT implement `CharSequence`), which none of the typed `instanceof` branches recognized. Without this, any UDF returning `Map<Utf8, Utf8>` through the analytics-engine route surfaces as `ExpressionEvaluationException: unsupported object class org.apache.arrow.vector.util.Text`. FQN match keeps `core/` free of an Arrow dependency. 2. **`integ-test/.../CalcitePPLSpathCommandIT.java`** — refactor `init()` to use `TestUtils.createIndexByRestClient` with an explicit keyword mapping for each of the four test indices (`test_spath`, `test_spath_auto`, `test_spath_cmd`, `test_spath_null`) before the per-doc PUTs. Without an explicit createIndex, the dynamic-mapping route bypasses the `tests.analytics.parquet_indices=true` parquet injection (because the toggle only fires inside `TestUtils.createIndexByRestClient`) and the analytics-engine fragment driver then fails with `UnsupportedOperationException: acquireReader is not supported in EngineBackedIndexer` for any test that reaches the runtime — `testSimpleSpath` was the only test affected before this PR (the other 15 failed earlier at the planner capability check). Idempotency via `TestUtils.isIndexExist` so the cluster-reuse pattern between `@Test` methods keeps working. No change for the v2 / Calcite path (the helper is a no-op for non-parquet runs). Signed-off-by: Kai Huang <ahkcs@amazon.com>
1 parent 1efb6c3 commit 9f6aef8

2 files changed

Lines changed: 91 additions & 32 deletions

File tree

core/src/main/java/org/opensearch/sql/data/model/ExprValueUtils.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,17 @@ public static ExprValue nullValue() {
111111
return ExprNullValue.of();
112112
}
113113

114+
private static final String ARROW_TEXT_CLASS_NAME = "org.apache.arrow.vector.util.Text";
115+
116+
/**
117+
* Whether {@code o} is an Arrow {@code Text} (the UTF-8 byte-buffer wrapper that arrow's Map /
118+
* Struct / List vectors emit for string values). FQN match keeps {@code core/} free of an Arrow
119+
* dependency.
120+
*/
121+
private static boolean isArrowText(Object o) {
122+
return o != null && ARROW_TEXT_CLASS_NAME.equals(o.getClass().getName());
123+
}
124+
114125
/** Construct ExprValue from Object. */
115126
public static ExprValue fromObjectValue(Object o) {
116127
if (null == o) {
@@ -143,6 +154,18 @@ public static ExprValue fromObjectValue(Object o) {
143154
return new ExprDoubleValue(d);
144155
} else if (o instanceof String) {
145156
return stringValue((String) o);
157+
} else if (isArrowText(o)) {
158+
// Arrow MapVector / StructVector yields values as
159+
// `org.apache.arrow.vector.util.Text` — a UTF-8 byte-buffer wrapper that
160+
// does NOT implement CharSequence and therefore wouldn't match any of the
161+
// typed branches above. `Text.toString()` decodes to a real Java String.
162+
// Matched by FQN rather than instanceof so `core/` doesn't acquire an
163+
// Arrow dependency for one type-system bridge. Without this branch the
164+
// analytics-engine route surfaces `ExpressionEvaluationException:
165+
// unsupported object class org.apache.arrow.vector.util.Text` from any
166+
// UDF returning Map<Utf8, Utf8> (first such UDF is `json_extract_all`
167+
// powering PPL `spath`).
168+
return stringValue(o.toString());
146169
} else if (o instanceof Float f) {
147170
if (!Float.isFinite(f)) {
148171
return LITERAL_NULL;

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLSpathCommandIT.java

Lines changed: 68 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,29 @@
1515
import org.json.JSONObject;
1616
import org.junit.jupiter.api.Test;
1717
import org.opensearch.client.Request;
18+
import org.opensearch.sql.legacy.TestUtils;
1819
import org.opensearch.sql.ppl.PPLIntegTestCase;
1920

2021
public class CalcitePPLSpathCommandIT extends PPLIntegTestCase {
22+
// Pin the doc fields as keyword so the analytics-engine compatibility run
23+
// (tests.analytics.parquet_indices=true) provisions each test index as a
24+
// parquet-backed composite store with parquet-friendly types. Without an
25+
// explicit createIndex, the dynamic-mapping route auto-creates Lucene-only
26+
// indices that DataFusion cannot acquireReader on, so the analytics-engine
27+
// fragment driver fails with `UnsupportedOperationException: acquireReader
28+
// is not supported in EngineBackedIndexer` before any spath function fires.
29+
private static final String SIMPLE_DOC_MAPPING =
30+
"{\"mappings\":{\"properties\":{\"doc\":{\"type\":\"keyword\"}}}}";
31+
32+
private static final String AUTO_DOC_MAPPING =
33+
"{\"mappings\":{\"properties\":{"
34+
+ "\"nested_doc\":{\"type\":\"keyword\"},"
35+
+ "\"array_doc\":{\"type\":\"keyword\"},"
36+
+ "\"merge_doc\":{\"type\":\"keyword\"},"
37+
+ "\"stringify_doc\":{\"type\":\"keyword\"},"
38+
+ "\"empty_doc\":{\"type\":\"keyword\"},"
39+
+ "\"malformed_doc\":{\"type\":\"keyword\"}}}}";
40+
2141
@Override
2242
public void init() throws Exception {
2343
super.init();
@@ -26,48 +46,64 @@ public void init() throws Exception {
2646
loadIndex(Index.BANK);
2747

2848
// Simple JSON docs for path-based extraction
29-
Request request1 = new Request("PUT", "/test_spath/_doc/1?refresh=true");
30-
request1.setJsonEntity("{\"doc\": \"{\\\"n\\\": 1}\"}");
31-
client().performRequest(request1);
49+
if (!TestUtils.isIndexExist(client(), "test_spath")) {
50+
TestUtils.createIndexByRestClient(client(), "test_spath", SIMPLE_DOC_MAPPING);
51+
52+
Request request1 = new Request("PUT", "/test_spath/_doc/1?refresh=true");
53+
request1.setJsonEntity("{\"doc\": \"{\\\"n\\\": 1}\"}");
54+
client().performRequest(request1);
3255

33-
Request request2 = new Request("PUT", "/test_spath/_doc/2?refresh=true");
34-
request2.setJsonEntity("{\"doc\": \"{\\\"n\\\": 2}\"}");
35-
client().performRequest(request2);
56+
Request request2 = new Request("PUT", "/test_spath/_doc/2?refresh=true");
57+
request2.setJsonEntity("{\"doc\": \"{\\\"n\\\": 2}\"}");
58+
client().performRequest(request2);
3659

37-
Request request3 = new Request("PUT", "/test_spath/_doc/3?refresh=true");
38-
request3.setJsonEntity("{\"doc\": \"{\\\"n\\\": 3}\"}");
39-
client().performRequest(request3);
60+
Request request3 = new Request("PUT", "/test_spath/_doc/3?refresh=true");
61+
request3.setJsonEntity("{\"doc\": \"{\\\"n\\\": 3}\"}");
62+
client().performRequest(request3);
63+
}
4064

4165
// Auto-extract mode: flatten rules and edge cases (empty, malformed)
42-
Request autoExtractDoc = new Request("PUT", "/test_spath_auto/_doc/1?refresh=true");
43-
autoExtractDoc.setJsonEntity(
44-
"{\"nested_doc\": \"{\\\"user\\\":{\\\"name\\\":\\\"John\\\"}}\","
45-
+ " \"array_doc\": \"{\\\"tags\\\":[\\\"java\\\",\\\"sql\\\"]}\","
46-
+ " \"merge_doc\": \"{\\\"a\\\":{\\\"b\\\":1},\\\"a.b\\\":2}\","
47-
+ " \"stringify_doc\": \"{\\\"n\\\":30,\\\"b\\\":true,\\\"x\\\":null}\","
48-
+ " \"empty_doc\": \"{}\","
49-
+ " \"malformed_doc\": \"{\\\"user\\\":{\\\"name\\\":\"}");
50-
client().performRequest(autoExtractDoc);
66+
if (!TestUtils.isIndexExist(client(), "test_spath_auto")) {
67+
TestUtils.createIndexByRestClient(client(), "test_spath_auto", AUTO_DOC_MAPPING);
68+
69+
Request autoExtractDoc = new Request("PUT", "/test_spath_auto/_doc/1?refresh=true");
70+
autoExtractDoc.setJsonEntity(
71+
"{\"nested_doc\": \"{\\\"user\\\":{\\\"name\\\":\\\"John\\\"}}\","
72+
+ " \"array_doc\": \"{\\\"tags\\\":[\\\"java\\\",\\\"sql\\\"]}\","
73+
+ " \"merge_doc\": \"{\\\"a\\\":{\\\"b\\\":1},\\\"a.b\\\":2}\","
74+
+ " \"stringify_doc\": \"{\\\"n\\\":30,\\\"b\\\":true,\\\"x\\\":null}\","
75+
+ " \"empty_doc\": \"{}\","
76+
+ " \"malformed_doc\": \"{\\\"user\\\":{\\\"name\\\":\"}");
77+
client().performRequest(autoExtractDoc);
78+
}
5179

5280
// Auto-extract mode: 2-doc index for spath + command (eval/where/stats/sort) tests
53-
Request cmdDoc1 = new Request("PUT", "/test_spath_cmd/_doc/1?refresh=true");
54-
cmdDoc1.setJsonEntity(
55-
"{\"doc\": \"{\\\"user\\\":{\\\"name\\\":\\\"John\\\",\\\"age\\\":30}}\"}");
56-
client().performRequest(cmdDoc1);
81+
if (!TestUtils.isIndexExist(client(), "test_spath_cmd")) {
82+
TestUtils.createIndexByRestClient(client(), "test_spath_cmd", SIMPLE_DOC_MAPPING);
5783

58-
Request cmdDoc2 = new Request("PUT", "/test_spath_cmd/_doc/2?refresh=true");
59-
cmdDoc2.setJsonEntity(
60-
"{\"doc\": \"{\\\"user\\\":{\\\"name\\\":\\\"Alice\\\",\\\"age\\\":25}}\"}");
61-
client().performRequest(cmdDoc2);
84+
Request cmdDoc1 = new Request("PUT", "/test_spath_cmd/_doc/1?refresh=true");
85+
cmdDoc1.setJsonEntity(
86+
"{\"doc\": \"{\\\"user\\\":{\\\"name\\\":\\\"John\\\",\\\"age\\\":30}}\"}");
87+
client().performRequest(cmdDoc1);
88+
89+
Request cmdDoc2 = new Request("PUT", "/test_spath_cmd/_doc/2?refresh=true");
90+
cmdDoc2.setJsonEntity(
91+
"{\"doc\": \"{\\\"user\\\":{\\\"name\\\":\\\"Alice\\\",\\\"age\\\":25}}\"}");
92+
client().performRequest(cmdDoc2);
93+
}
6294

6395
// Auto-extract mode: null input handling (doc 1 establishes mapping, doc 2 has null)
64-
Request nullDoc1 = new Request("PUT", "/test_spath_null/_doc/1?refresh=true");
65-
nullDoc1.setJsonEntity("{\"doc\": \"{\\\"n\\\": 1}\"}");
66-
client().performRequest(nullDoc1);
96+
if (!TestUtils.isIndexExist(client(), "test_spath_null")) {
97+
TestUtils.createIndexByRestClient(client(), "test_spath_null", SIMPLE_DOC_MAPPING);
98+
99+
Request nullDoc1 = new Request("PUT", "/test_spath_null/_doc/1?refresh=true");
100+
nullDoc1.setJsonEntity("{\"doc\": \"{\\\"n\\\": 1}\"}");
101+
client().performRequest(nullDoc1);
67102

68-
Request nullDoc2 = new Request("PUT", "/test_spath_null/_doc/2?refresh=true");
69-
nullDoc2.setJsonEntity("{\"doc\": null}");
70-
client().performRequest(nullDoc2);
103+
Request nullDoc2 = new Request("PUT", "/test_spath_null/_doc/2?refresh=true");
104+
nullDoc2.setJsonEntity("{\"doc\": null}");
105+
client().performRequest(nullDoc2);
106+
}
71107
}
72108

73109
@Test

0 commit comments

Comments
 (0)