Skip to content

Commit 6c7d6ff

Browse files
authored
[QA] Add AppendPipeCommandIT and TableCommandIT for the analytics-engine REST path (#21526)
* [QA] Add AppendPipeCommandIT for the analytics-engine REST path PPL `appendpipe` already passes 4/4 of its v2-side `CalcitePPLAppendPipeCommandIT` cases on the analytics-engine route under `tests.analytics.force_routing=true` β€” the existing capability surface (LogicalUnion + LogicalAggregate over SUM, plus the SchemaUnifier type-conflict path) is sufficient. No code changes in core; this PR just lands a self-contained QA IT so the analytics-engine path can be verified inside core without cross-plugin dependencies. Three tests, mirroring the v2-side surface that exercises the three distinct shapes of `appendpipe`: * `testAppendPipeSort` β€” duplicate the post-stats stream and re-sort the duplicate inline. Exercises Union over identical schemas. 5 / 6 rows kept by `head 5` β€” multiset overlap is fine because the outer `sort str0` pins the original branch's order, and the duplicate's `sort -sum_int0_by_str0` pins the inner branch's order. * `testAppendPipeWithMergedColumn` β€” duplicate the post-stats stream and collapse it via an inner `stats sum(sum) as sum`. Exercises SchemaUnifier merging the str0-bearing original branch with the inner-branch single-row that has only `sum`. The two branches arrive at the coordinator's Union in non-deterministic order, so multiset comparison. * `testAppendPipeWithConflictTypeColumn` β€” inner pipeline rewrites the same-named column to a different type. Exercises the SchemaUnifier validation path that surfaces "due to incompatible types" before execution. Reuses the existing `calcs` parquet-backed dataset via `DatasetProvisioner` (no new fixtures). `testAppendDifferentIndex` from the v2-side IT is intentionally not ported β€” it exercises `append` (separate `source=...` sub-search), already covered by `AppendCommandIT`. ## Test plan * `./gradlew :sandbox:qa:analytics-engine-rest:integTest -Dsandbox.enabled=true --tests "*AppendPipeCommandIT"` β€” 3/3 green * `./gradlew :sandbox:qa:analytics-engine-rest:check -Dsandbox.enabled=true` β€” green * Verified end-to-end: 56 force_routing transitions, 63 analytics-engine PlannerImpl entries, 0 v2 PPLService.Incoming entries during a sweep including this IT. ## Out of scope * PPL `multisearch` was originally bundled with this PR. Triage showed ~12 of its 15 analytics-route failures are blocked on a Substrait-side issue: DataFusion's substrait consumer rejects the Plan emitted for `LogicalUnion(StageInputScan, StageInputScan)` with `Names list must match exactly to nested schema, but found N uses for M names`. Root cause not diagnosed yet (the registered child-stage schema width vs. `Plan.Root.names` width disagree somewhere between `LocalStageScheduler.buildChildInputs` and DataFusion's `make_renamed_schema`). Out of scope here; tracked for a follow-up PR after deeper investigation. * Aggregation, TIMESTAMP/DATE type-system, eval-predicate scalars, and PPL `span` follow the same scope discipline as #21521 β€” each is its own work track and not addressed here. Signed-off-by: Kai Huang <ahkcs@amazon.com> * [QA] Add TableCommandIT for the analytics-engine REST path `table` is a syntactic alias of `fields` β€” the v2 AstBuilder dispatches both through `buildProjectCommand` once `plugins.calcite.enabled=true` is visible to the AstBuilder via the UnifiedQueryContext. The mirror fix landed in opensearch-project/sql#5413; this commit closes the gap on the test-ppl-frontend side, where the UnifiedQueryContext is constructed locally rather than coming from the SQL plugin. Two changes: - `UnifiedQueryService` now sets `plugins.calcite.enabled=true` on the context. The unified path is Calcite-based by definition; without this flag the AstBuilder rejects table/regex/rex/convert. - New `TableCommandIT` covering the surfaces specific to the `table` keyword: comma-delimited, space-delimited, suffix wildcard, leading-`-` exclusion, and `fields` ↔ `table` equivalence on identical inputs. Plain projection semantics already covered by FieldsCommandIT are not duplicated. Validates: 5/5 TableCommandIT pass, 3/3 AppendPipeCommandIT still pass. Signed-off-by: Kai Huang <ahkcs@amazon.com> --------- Signed-off-by: Kai Huang <ahkcs@amazon.com>
1 parent 50125c5 commit 6c7d6ff

3 files changed

Lines changed: 380 additions & 0 deletions

File tree

β€Žsandbox/plugins/test-ppl-frontend/src/main/java/org/opensearch/ppl/action/UnifiedQueryService.javaβ€Ž

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,11 @@ protected Map<String, Table> getTableMap() {
8080
.language(QueryType.PPL)
8181
.catalog(DEFAULT_CATALOG, flatSchema)
8282
.defaultNamespace(DEFAULT_CATALOG)
83+
// The unified PPL parser reuses the v2 AstBuilder, which gates Calcite-only
84+
// commands (table, regex, rex, convert) on plugins.calcite.enabled. The unified
85+
// path is by definition Calcite-based β€” flag it on so those commands lower
86+
// through the same Project/Filter RelNodes as their non-aliased counterparts.
87+
.setting("plugins.calcite.enabled", true)
8388
.build()
8489
) {
8590

Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.analytics.qa;
10+
11+
import org.opensearch.client.Request;
12+
import org.opensearch.client.Response;
13+
import org.opensearch.client.ResponseException;
14+
15+
import java.io.IOException;
16+
import java.util.Arrays;
17+
import java.util.List;
18+
import java.util.Map;
19+
20+
/**
21+
* Self-contained integration test for PPL {@code appendpipe} on the analytics-engine route.
22+
*
23+
* <p>Mirrors {@code CalcitePPLAppendPipeCommandIT} from the {@code opensearch-project/sql}
24+
* repository so the analytics-engine path can be verified inside core without cross-plugin
25+
* dependencies. Each test sends a PPL query through {@code POST /_analytics/ppl} (exposed
26+
* by the {@code test-ppl-frontend} plugin), which runs the same {@code UnifiedQueryPlanner}
27+
* β†’ {@code CalciteRelNodeVisitor} β†’ Substrait β†’ DataFusion pipeline as the SQL plugin's
28+
* force-routed analytics path.
29+
*
30+
* <p>{@code appendpipe} differs from {@code append} (covered by {@link AppendCommandIT}):
31+
* {@code appendpipe [pipeline]} duplicates the current intermediate result, applies the
32+
* inline {@code [pipeline]} to the duplicate, and appends the duplicate's output to the
33+
* original. {@code append [search]} runs an entirely separate sub-query and unions its
34+
* output. Both lower to a Calcite {@code LogicalUnion} but the upper-stage shape differs
35+
* because {@code appendpipe} reuses the original's row stream as its input rather than
36+
* starting a fresh {@code source=...}.
37+
*
38+
* <p>Provisions the {@code calcs} dataset once. {@link AnalyticsRestTestCase#preserveIndicesUponCompletion()}
39+
* keeps it across test methods.
40+
*/
41+
public class AppendPipeCommandIT extends AnalyticsRestTestCase {
42+
43+
private static final Dataset DATASET = new Dataset("calcs", "calcs");
44+
45+
private static boolean dataProvisioned = false;
46+
47+
private void ensureDataProvisioned() throws IOException {
48+
if (dataProvisioned == false) {
49+
DatasetProvisioner.provision(client(), DATASET);
50+
dataProvisioned = true;
51+
}
52+
}
53+
54+
// ── duplicate + inline sort, then head ──────────────────────────────────────
55+
56+
public void testAppendPipeSort() throws IOException {
57+
// Branch: stats sum(int0) by str0 β†’ 3 rows (FURNITURE=1, OFFICE SUPPLIES=18, TECHNOLOGY=49).
58+
// Outer `sort str0` pins the original to alphabetical order. `appendpipe [sort -sum_int0_by_str0]`
59+
// duplicates the 3 rows and re-sorts them descending, then appends. `head 5` keeps the first
60+
// 5 of the 6 total rows: original 3 + first 2 of the descending duplicate.
61+
assertRows(
62+
"source="
63+
+ DATASET.indexName
64+
+ " | stats sum(int0) as sum_int0_by_str0 by str0 | sort str0"
65+
+ " | appendpipe [ sort -sum_int0_by_str0 ]"
66+
+ " | head 5",
67+
row(1, "FURNITURE"),
68+
row(18, "OFFICE SUPPLIES"),
69+
row(49, "TECHNOLOGY"),
70+
row(49, "TECHNOLOGY"),
71+
row(18, "OFFICE SUPPLIES")
72+
);
73+
}
74+
75+
// ── duplicate + inline stats producing a smaller schema (merged column) ─────
76+
77+
public void testAppendPipeWithMergedColumn() throws IOException {
78+
// Outer stats: sum(int0) by str0 β†’ 3 rows. `appendpipe [stats sum(sum) as sum]` runs an inner
79+
// stats over the duplicate, collapsing it to a single row carrying only the `sum` column.
80+
// Schema unification keeps both the original branch's `str0` and the inner branch's
81+
// `sum` column; the inner row is null-padded for the missing `str0`. The two branches
82+
// arrive at the coordinator's union in non-deterministic order (each is its own data-node
83+
// stage), so compare as a multiset rather than positionally.
84+
assertRowsAnyOrder(
85+
"source="
86+
+ DATASET.indexName
87+
+ " | stats sum(int0) as sum by str0 | sort str0"
88+
+ " | appendpipe [ stats sum(sum) as sum ]",
89+
row(1, "FURNITURE"),
90+
row(18, "OFFICE SUPPLIES"),
91+
row(49, "TECHNOLOGY"),
92+
row(68, null)
93+
);
94+
}
95+
96+
// ── duplicate + inline cast that clashes with the original's column type ───
97+
98+
public void testAppendPipeWithConflictTypeColumn() {
99+
// Branch 1 produces `sum` as BIGINT (sum over int0). The inner pipeline of
100+
// `appendpipe [eval sum = cast(sum as double)]` rewrites the same-named column to
101+
// DOUBLE. SchemaUnifier refuses to merge the diverging types and surfaces a
102+
// planner-side validation error before execution.
103+
assertErrorContains(
104+
"source="
105+
+ DATASET.indexName
106+
+ " | stats sum(int0) as sum by str0 | sort str0"
107+
+ " | appendpipe [ eval sum = cast(sum as double) ]"
108+
+ " | head 5",
109+
"due to incompatible types"
110+
);
111+
}
112+
113+
// ── helpers ─────────────────────────────────────────────────────────────────
114+
115+
private static List<Object> row(Object... values) {
116+
return Arrays.asList(values);
117+
}
118+
119+
/**
120+
* Multiset comparison β€” branch ordering at the coordinator's Union is non-deterministic.
121+
* Used by {@link #testAppendPipeWithMergedColumn} where the original-branch stats output
122+
* (3 rows) and the inner-branch collapsed-sum (1 row) can arrive in either order.
123+
*/
124+
@SafeVarargs
125+
@SuppressWarnings("varargs")
126+
private final void assertRowsAnyOrder(String ppl, List<Object>... expected) throws IOException {
127+
Map<String, Object> response = executePpl(ppl);
128+
@SuppressWarnings("unchecked")
129+
List<List<Object>> actualRows = (List<List<Object>>) response.get("rows");
130+
assertNotNull("Response missing 'rows' for query: " + ppl, actualRows);
131+
assertEquals("Row count mismatch for query: " + ppl, expected.length, actualRows.size());
132+
java.util.List<List<Object>> remaining = new java.util.ArrayList<>(actualRows);
133+
outer:
134+
for (List<Object> want : expected) {
135+
for (int i = 0; i < remaining.size(); i++) {
136+
if (rowsEqual(want, remaining.get(i))) {
137+
remaining.remove(i);
138+
continue outer;
139+
}
140+
}
141+
fail("Expected row not found for query: " + ppl + " β€” missing: " + want + " in actual: " + actualRows);
142+
}
143+
}
144+
145+
private static boolean rowsEqual(List<Object> a, List<Object> b) {
146+
if (a.size() != b.size()) return false;
147+
for (int i = 0; i < a.size(); i++) {
148+
Object ax = a.get(i);
149+
Object bx = b.get(i);
150+
if (ax == null || bx == null) {
151+
if (ax != bx) return false;
152+
continue;
153+
}
154+
if (ax instanceof Number && bx instanceof Number) {
155+
if (Double.compare(((Number) ax).doubleValue(), ((Number) bx).doubleValue()) != 0) return false;
156+
continue;
157+
}
158+
if (!ax.equals(bx)) return false;
159+
}
160+
return true;
161+
}
162+
163+
@SafeVarargs
164+
@SuppressWarnings("varargs")
165+
private final void assertRows(String ppl, List<Object>... expected) throws IOException {
166+
Map<String, Object> response = executePpl(ppl);
167+
@SuppressWarnings("unchecked")
168+
List<List<Object>> actualRows = (List<List<Object>>) response.get("rows");
169+
assertNotNull("Response missing 'rows' for query: " + ppl, actualRows);
170+
assertEquals("Row count mismatch for query: " + ppl, expected.length, actualRows.size());
171+
for (int i = 0; i < expected.length; i++) {
172+
List<Object> want = expected[i];
173+
List<Object> got = actualRows.get(i);
174+
assertEquals(
175+
"Column count mismatch at row " + i + " for query: " + ppl,
176+
want.size(),
177+
got.size()
178+
);
179+
for (int j = 0; j < want.size(); j++) {
180+
assertCellEquals(
181+
"Cell mismatch at row " + i + ", col " + j + " for query: " + ppl,
182+
want.get(j),
183+
got.get(j)
184+
);
185+
}
186+
}
187+
}
188+
189+
private void assertErrorContains(String ppl, String expectedSubstring) {
190+
try {
191+
Map<String, Object> response = executePpl(ppl);
192+
fail("Expected query to fail with [" + expectedSubstring + "] but got response: " + response);
193+
} catch (ResponseException e) {
194+
String body;
195+
try {
196+
body = org.opensearch.test.rest.OpenSearchRestTestCase.entityAsMap(e.getResponse()).toString();
197+
} catch (IOException ioe) {
198+
body = e.getMessage();
199+
}
200+
assertTrue(
201+
"Expected response body to contain [" + expectedSubstring + "] but was: " + body,
202+
body.contains(expectedSubstring)
203+
);
204+
} catch (IOException e) {
205+
fail("Unexpected IOException: " + e);
206+
}
207+
}
208+
209+
private Map<String, Object> executePpl(String ppl) throws IOException {
210+
ensureDataProvisioned();
211+
Request request = new Request("POST", "/_analytics/ppl");
212+
request.setJsonEntity("{\"query\": \"" + escapeJson(ppl) + "\"}");
213+
Response response = client().performRequest(request);
214+
return assertOkAndParse(response, "PPL: " + ppl);
215+
}
216+
217+
private static void assertCellEquals(String message, Object expected, Object actual) {
218+
if (expected == null || actual == null) {
219+
assertEquals(message, expected, actual);
220+
return;
221+
}
222+
if (expected instanceof Number && actual instanceof Number) {
223+
double e = ((Number) expected).doubleValue();
224+
double a = ((Number) actual).doubleValue();
225+
if (Double.compare(e, a) != 0) {
226+
fail(message + ": expected <" + expected + "> but was <" + actual + ">");
227+
}
228+
return;
229+
}
230+
assertEquals(message, expected, actual);
231+
}
232+
}
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.analytics.qa;
10+
11+
import org.opensearch.client.Request;
12+
import org.opensearch.client.Response;
13+
14+
import java.io.IOException;
15+
import java.util.List;
16+
import java.util.Map;
17+
18+
/**
19+
* Self-contained integration test for PPL {@code table} on the analytics-engine route.
20+
*
21+
* <p>{@code table} is a syntactic alias of {@code fields} β€” the SQL plugin's
22+
* {@code AstBuilder.visitTableCommand} reuses {@code buildProjectCommand} (the same
23+
* code path {@code fields} dispatches to) once {@code plugins.calcite.enabled=true} is
24+
* propagated through the {@code UnifiedQueryContext} (see
25+
* <a href="https://github.com/opensearch-project/sql/pull/5413">opensearch-project/sql#5413</a>).
26+
* The added value of {@code table} is a more permissive token shape: it accepts
27+
* space-delimited field lists, leading-{@code -} exclusion forms, and mixes those with
28+
* commas β€” surfaces {@code fields} doesn't expose.
29+
*
30+
* <p>This IT covers the surfaces specific to the {@code table} keyword to lock in that
31+
* the analytics path lowers them to the same Calcite {@code Project} RelNode as the v2 /
32+
* Calcite path does. Plain projection semantics (already covered by {@code FieldsCommandIT})
33+
* are not duplicated here.
34+
*
35+
* <p>Reuses the {@code calcs} parquet-backed dataset.
36+
*/
37+
public class TableCommandIT extends AnalyticsRestTestCase {
38+
39+
private static final Dataset DATASET = new Dataset("calcs", "calcs");
40+
41+
private static boolean dataProvisioned = false;
42+
43+
private void ensureDataProvisioned() throws IOException {
44+
if (dataProvisioned == false) {
45+
DatasetProvisioner.provision(client(), DATASET);
46+
dataProvisioned = true;
47+
}
48+
}
49+
50+
public void testTableCommaDelimited() throws IOException {
51+
// Comma-delimited form β€” same shape as `fields a, b`. Sanity check that the table
52+
// keyword reaches buildProjectCommand without falling back to the v2-only error.
53+
assertColumns(
54+
"source=" + DATASET.indexName + " | table str0, num0 | head 3",
55+
"str0",
56+
"num0"
57+
);
58+
}
59+
60+
public void testTableSpaceDelimited() throws IOException {
61+
// Space-delimited form β€” unique to `table`. Validates the lexer accepts whitespace as
62+
// a separator and the AstBuilder folds the multi-token list into a single Project.
63+
assertColumns(
64+
"source=" + DATASET.indexName + " | table str0 num0 int0 | head 3",
65+
"str0",
66+
"num0",
67+
"int0"
68+
);
69+
}
70+
71+
public void testTableSuffixWildcard() throws IOException {
72+
// *0 expands at parse time to all columns ending in '0'. Identical to
73+
// FieldsCommandIT.testFieldsSuffixWildcard on the analytics path; pinned here
74+
// for the `table` lowering specifically. Order is analyzer-dependent, so set-equality.
75+
Map<String, Object> response = executePpl(
76+
"source=" + DATASET.indexName + " | table *0 | head 1"
77+
);
78+
@SuppressWarnings("unchecked")
79+
List<String> columns = (List<String>) response.get("columns");
80+
assertNotNull("Response missing 'columns'", columns);
81+
java.util.Set<String> actual = new java.util.HashSet<>(columns);
82+
java.util.Set<String> expected = new java.util.HashSet<>(
83+
java.util.Arrays.asList("num0", "str0", "int0", "bool0", "date0", "time0", "datetime0")
84+
);
85+
assertEquals("Wildcard *0 column set", expected, actual);
86+
}
87+
88+
public void testTableMinusExclusion() throws IOException {
89+
// `table - num0, num1, num2, num3, num4` removes those five columns. The leading
90+
// minus form is unique to `table`; `fields` uses `fields - a, b, ...` with a
91+
// comma-separated list (no space-delimiting). Validates analytics path retains
92+
// exclusion semantics.
93+
Map<String, Object> response = executePpl(
94+
"source=" + DATASET.indexName + " | table - num0, num1, num2, num3, num4 | head 1"
95+
);
96+
@SuppressWarnings("unchecked")
97+
List<String> columns = (List<String>) response.get("columns");
98+
assertNotNull("Response missing 'columns'", columns);
99+
for (String name : columns) {
100+
assertFalse("Excluded column should not appear: " + name, name.startsWith("num"));
101+
}
102+
}
103+
104+
public void testFieldsAndTableEquivalence() throws IOException {
105+
// Cross-check that `fields a, b, c` and `table a, b, c` produce identical
106+
// schema + rows. Makes the alias claim explicit at the response level so a
107+
// future divergence (e.g. `table` accidentally adds a Sort or rewires the
108+
// Project) is caught here.
109+
Map<String, Object> fieldsResp = executePpl(
110+
"source=" + DATASET.indexName + " | fields str0, num0, int0 | head 3"
111+
);
112+
Map<String, Object> tableResp = executePpl(
113+
"source=" + DATASET.indexName + " | table str0, num0, int0 | head 3"
114+
);
115+
assertEquals("columns from fields vs table", fieldsResp.get("columns"), tableResp.get("columns"));
116+
assertEquals("rows from fields vs table", fieldsResp.get("rows"), tableResp.get("rows"));
117+
}
118+
119+
// ── helpers ─────────────────────────────────────────────────────────────────
120+
121+
private void assertColumns(String ppl, String... expectedColumns) throws IOException {
122+
Map<String, Object> response = executePpl(ppl);
123+
@SuppressWarnings("unchecked")
124+
List<String> columns = (List<String>) response.get("columns");
125+
assertNotNull("Response missing 'columns' for query: " + ppl, columns);
126+
assertEquals("Column count for query: " + ppl, expectedColumns.length, columns.size());
127+
for (int i = 0; i < expectedColumns.length; i++) {
128+
assertEquals(
129+
"Column at position " + i + " for query: " + ppl,
130+
expectedColumns[i],
131+
columns.get(i)
132+
);
133+
}
134+
}
135+
136+
private Map<String, Object> executePpl(String ppl) throws IOException {
137+
ensureDataProvisioned();
138+
Request request = new Request("POST", "/_analytics/ppl");
139+
request.setJsonEntity("{\"query\": \"" + escapeJson(ppl) + "\"}");
140+
Response response = client().performRequest(request);
141+
return assertOkAndParse(response, "PPL: " + ppl);
142+
}
143+
}

0 commit comments

Comments
Β (0)