Skip to content

Commit dcb68f9

Browse files
authored
Wire PPL where command through the analytics-engine path (opensearch-project#21502)
* Wire DataFusion backend for the PPL where command Three orthogonal compatibility gaps surface when CalciteWhereCommandIT runs through the analytics-engine route. Address them in the DataFusion backend: 1. CAST as a project capability. ReduceExpressionsRule.ProjectReduceExpressionsRule constant-folds field references through equality filters into typed literals — `where str0 = 'FURNITURE' | fields str0` becomes `Project[CAST('FURNITURE' AS VARCHAR)]`. Without CAST in STANDARD_PROJECT_OPS, OpenSearchProjectRule throws "No backend supports scalar function [CAST]". 5 where-command tests previously broken; substrait isthmus and DataFusion handle CAST natively. 2. Comparison ops as project capabilities. PPL `eval x = (a == b)` produces a LogicalProject whose projected expression is the comparison itself (returning boolean). EQUALS/NOT_EQUALS/GT/GE/LT/LE were declared filter-only; add them to STANDARD_PROJECT_OPS so projections of boolean expressions are accepted. Fixes testDoubleEqualInEvalCommand. 3. ILIKE adapter. Substrait isthmus only knows the standard SQL LIKE operator; the Calcite-specific ILIKE has no extension. PPL emits ILIKE for the `contains` operator and for `LIKE` when plugins.ppl.syntax.legacy.preferred is true (its default), affecting six where-command tests with "Unable to convert call ILIKE(...)". Additionally, PPLFuncImpTable always passes an explicit '\\' escape literal of type CHAR(1), which substrait's like signature rejects as "Unable to convert call LIKE(string?, char<N>, char<1>)". The new IlikeFunctionAdapter handles both: ILIKE rewrites to LIKE(LOWER(field), LOWER(pattern)) — wildcards and the escape character survive Character.toLowerCase unchanged, preserving case-insensitive semantics — and the 3-arg form is reduced to 2-arg by dropping the default-only escape, since PPL never emits a non-default escape (the contains operator pre-escapes user-provided literals before composing the wildcard pattern). Both LOWER and 2-arg LIKE are natively supported by DataFusion. Registered for ScalarFunction.LIKE because Calcite's SqlLibraryOperators.ILIKE shares SqlKind.LIKE with the standard LIKE operator and resolves to the same adapter slot; the adapter checks the operator name to discriminate. Signed-off-by: Kai Huang <ahkcs@amazon.com> * Add WhereCommandIT QA test for the analytics-engine route Self-contained REST integration test under sandbox/qa/analytics-engine-rest mirroring the surface exercised by CalciteWhereCommandIT in the SQL plugin, adapted to the calcs dataset already shipped under src/test/resources/datasets/calcs/. Each test posts a PPL query through POST /_analytics/ppl (exposed by the test-ppl-frontend plugin), exercising the same UnifiedQueryPlanner → CalciteRelNodeVisitor → analytics-engine planner → Substrait → DataFusion pipeline as the SQL plugin's force-routed analytics path. CI for OpenSearch core can verify the where command end-to-end without checking out the SQL plugin or relying on its IT classpath. Coverage (26 cases, all passing): - Comparison operators (=, ==, !=, <, >, <=, >=) - Boolean connectives AND, OR, NOT - IS NULL / IS NOT NULL via isnull() / isnotnull() - IN / NOT IN against keyword and numeric columns - LIKE function and operator (with % and _ wildcards) - contains (lowers to ILIKE — case-insensitive) - Sub-expression scalar calls inside predicates: length, abs, + Signed-off-by: Kai Huang <ahkcs@amazon.com> --------- Signed-off-by: Kai Huang <ahkcs@amazon.com>
1 parent bd8e810 commit dcb68f9

2 files changed

Lines changed: 352 additions & 2 deletions

File tree

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionAnalyticsBackendPlugin.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,16 @@ public class DataFusionAnalyticsBackendPlugin implements AnalyticsSearchBackendP
8282
// PPL command/function we want the analytics-engine planner to route through DataFusion. Add
8383
// here only after verifying the function deserializes through Substrait isthmus into a plan
8484
// DataFusion's native runtime can execute (see DataFusionFragmentConvertor for the conversion
85-
// path). COALESCE is the lowering target of PPL `fillnull`.
85+
// path). COALESCE is the lowering target of PPL `fillnull`. CAST is required because
86+
// ReduceExpressionsRule.ProjectReduceExpressionsRule (in PlannerImpl) constant-folds field
87+
// references through equality filters into typed literals — e.g. after `where str0 = 'FURNITURE'`,
88+
// the projection `fields str0` is rewritten to `CAST('FURNITURE' AS VARCHAR)`. The remaining
89+
// comparison / arithmetic / logical operators are project-capable for eval-style projections.
8690
private static final Set<ScalarFunction> STANDARD_PROJECT_OPS = Set.of(
8791
ScalarFunction.COALESCE,
8892
ScalarFunction.CEIL,
93+
ScalarFunction.CAST,
8994
ScalarFunction.SARG_PREDICATE,
90-
// comparison / arithmetic / logical operators in eval-style projections.
9195
ScalarFunction.EQUALS,
9296
ScalarFunction.NOT_EQUALS,
9397
ScalarFunction.GREATER_THAN,
Lines changed: 346 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,346 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.analytics.qa;
10+
11+
import org.opensearch.client.Request;
12+
import org.opensearch.client.Response;
13+
14+
import java.io.IOException;
15+
import java.util.Arrays;
16+
import java.util.List;
17+
import java.util.Map;
18+
19+
/**
20+
* Self-contained integration test for PPL {@code where} on the analytics-engine route.
21+
*
22+
* <p>Mirrors the surface exercised by {@code CalciteWhereCommandIT} from the
23+
* {@code opensearch-project/sql} repository, adapted to the {@code calcs} dataset
24+
* shipped under {@code sandbox/qa/analytics-engine-rest/src/test/resources/datasets/calcs/}.
25+
* Each test sends a PPL query through {@code POST /_analytics/ppl} (exposed by the
26+
* {@code test-ppl-frontend} plugin), exercising the same {@code UnifiedQueryPlanner} →
27+
* {@code CalciteRelNodeVisitor} → analytics-engine planner → Substrait → DataFusion
28+
* pipeline as the SQL plugin's force-routed analytics path.
29+
*
30+
* <p>Top-level filter operators covered (see
31+
* {@link org.opensearch.analytics.spi.ScalarFunction} → {@code STANDARD_FILTER_OPS} in
32+
* {@code DataFusionAnalyticsBackendPlugin}):
33+
* <ul>
34+
* <li>{@code = / == / != / &lt; / &gt; / &lt;= / &gt;=}</li>
35+
* <li>Boolean connectives {@code AND / OR / NOT}</li>
36+
* <li>{@code IS NULL} / {@code IS NOT NULL} via {@code isnull()} / {@code isnotnull()}</li>
37+
* <li>{@code IN} / {@code NOT IN}</li>
38+
* <li>{@code LIKE} (operator + function) and {@code contains} (lowers to {@code ILIKE})</li>
39+
* </ul>
40+
*
41+
* <p>Sub-expression coverage (passed through to DataFusion via Substrait without
42+
* appearing as the leaf-predicate operator): {@code length()}, {@code abs()},
43+
* arithmetic {@code +}.
44+
*/
45+
public class WhereCommandIT extends AnalyticsRestTestCase {
46+
47+
private static final Dataset DATASET = new Dataset("calcs", "calcs");
48+
49+
private static boolean dataProvisioned = false;
50+
51+
/**
52+
* Lazily provision the calcs dataset on first invocation. Same lazy-provision pattern
53+
* as {@link FillNullCommandIT} — {@code client()} is only reliably available inside a
54+
* test body, not in {@code @BeforeClass} / {@code setUp()}.
55+
*/
56+
private void ensureDataProvisioned() throws IOException {
57+
if (dataProvisioned == false) {
58+
DatasetProvisioner.provision(client(), DATASET);
59+
dataProvisioned = true;
60+
}
61+
}
62+
63+
// ── Comparison operators ────────────────────────────────────────────────
64+
65+
public void testWhereEqualOnKeyword() throws IOException {
66+
// 2 rows have str0='FURNITURE'.
67+
assertRowCount("source=" + DATASET.indexName + " | where str0 = 'FURNITURE' | fields str0", 2);
68+
}
69+
70+
public void testWhereEqualOnDouble() throws IOException {
71+
assertRows(
72+
"source=" + DATASET.indexName + " | where num0 = 12.3 | fields str2, num0",
73+
row("one", 12.3)
74+
);
75+
}
76+
77+
public void testWhereDoubleEqualOperator() throws IOException {
78+
// == is parsed as = at the AstExpressionBuilder layer; same plan, same result.
79+
assertRows(
80+
"source=" + DATASET.indexName + " | where num0 == 12.3 | fields str2, num0",
81+
row("one", 12.3)
82+
);
83+
}
84+
85+
public void testWhereNotEqual() throws IOException {
86+
// 8 non-null distinct num0 values; != 0 keeps 7 rows (drops the single num0=0).
87+
assertRowCount("source=" + DATASET.indexName + " | where num0 != 0 | fields num0", 7);
88+
}
89+
90+
public void testWhereGreaterThan() throws IOException {
91+
// num0 > 0 → {12.3, 15.7, 3.5, 10}.
92+
assertRowCount("source=" + DATASET.indexName + " | where num0 > 0 | fields num0", 4);
93+
}
94+
95+
public void testWhereGreaterEqual() throws IOException {
96+
// num0 >= 0 → adds the row with num0=0 → 5 rows.
97+
assertRowCount("source=" + DATASET.indexName + " | where num0 >= 0 | fields num0", 5);
98+
}
99+
100+
public void testWhereLessThan() throws IOException {
101+
// num0 < 0 → {-12.3, -15.7, -3.5}.
102+
assertRowCount("source=" + DATASET.indexName + " | where num0 < 0 | fields num0", 3);
103+
}
104+
105+
public void testWhereLessEqual() throws IOException {
106+
// num0 <= 0 → adds num0=0 → 4 rows.
107+
assertRowCount("source=" + DATASET.indexName + " | where num0 <= 0 | fields num0", 4);
108+
}
109+
110+
// ── Boolean connectives ─────────────────────────────────────────────────
111+
112+
public void testWhereAnd() throws IOException {
113+
// FURNITURE rows are key00 (num0=12.3) and key01 (num0=-12.3); AND num0>0 keeps key00.
114+
assertRows(
115+
"source=" + DATASET.indexName + " | where str0 = 'FURNITURE' and num0 > 0 | fields str2, num0",
116+
row("one", 12.3)
117+
);
118+
}
119+
120+
public void testWhereOr() throws IOException {
121+
// num0 == 12.3 OR num0 == -12.3 → key00, key01.
122+
assertRowCount(
123+
"source=" + DATASET.indexName + " | where num0 == 12.3 OR num0 == -12.3 | fields num0",
124+
2
125+
);
126+
}
127+
128+
public void testWhereNot() throws IOException {
129+
// NOT (str0 = 'FURNITURE') → 17 - 2 = 15 rows. (str0 has no nulls in calcs.)
130+
assertRowCount(
131+
"source=" + DATASET.indexName + " | where not str0 = 'FURNITURE' | fields str0",
132+
15
133+
);
134+
}
135+
136+
public void testWhereMultipleChained() throws IOException {
137+
// Three filter steps: FURNITURE → num0>0 → str2='one'. Should leave one row.
138+
assertRows(
139+
"source=" + DATASET.indexName
140+
+ " | where str0 = 'FURNITURE'"
141+
+ " | where num0 > 0"
142+
+ " | where str2 = 'one'"
143+
+ " | fields str0, num0, str2",
144+
row("FURNITURE", 12.3, "one")
145+
);
146+
}
147+
148+
// ── NULL handling via isnull() / isnotnull() ────────────────────────────
149+
150+
public void testWhereIsNull() throws IOException {
151+
// str2 has 4 null rows in calcs.
152+
assertRowCount(
153+
"source=" + DATASET.indexName + " | where isnull(str2) | fields str2",
154+
4
155+
);
156+
}
157+
158+
public void testWhereIsNotNull() throws IOException {
159+
// str2 has 13 non-null rows in calcs.
160+
assertRowCount(
161+
"source=" + DATASET.indexName + " | where isnotnull(str2) | fields str2",
162+
13
163+
);
164+
}
165+
166+
// ── IN / NOT IN ─────────────────────────────────────────────────────────
167+
168+
public void testWhereInOnKeyword() throws IOException {
169+
// FURNITURE (2) + OFFICE SUPPLIES (6) = 8.
170+
assertRowCount(
171+
"source=" + DATASET.indexName + " | where str0 in ('FURNITURE', 'OFFICE SUPPLIES') | fields str0",
172+
8
173+
);
174+
}
175+
176+
public void testWhereInOnNumeric() throws IOException {
177+
// num0 IN (12.3, -12.3) → key00, key01 = 2 rows.
178+
assertRowCount(
179+
"source=" + DATASET.indexName + " | where num0 in (12.3, -12.3) | fields num0",
180+
2
181+
);
182+
}
183+
184+
public void testWhereNotIn() throws IOException {
185+
// Complement of (FURNITURE, OFFICE SUPPLIES): 9 TECHNOLOGY rows.
186+
assertRowCount(
187+
"source=" + DATASET.indexName + " | where not str0 in ('FURNITURE', 'OFFICE SUPPLIES') | fields str0",
188+
9
189+
);
190+
}
191+
192+
// ── LIKE function and operator ──────────────────────────────────────────
193+
194+
public void testWhereLikeFunction() throws IOException {
195+
// like(str0, 'FURN%') → 2 FURNITURE rows.
196+
assertRowCount(
197+
"source=" + DATASET.indexName + " | where like(str0, 'FURN%') | fields str0",
198+
2
199+
);
200+
}
201+
202+
public void testWhereLikeOperator() throws IOException {
203+
// str0 LIKE 'OFF%' → 6 OFFICE SUPPLIES rows.
204+
assertRowCount(
205+
"source=" + DATASET.indexName + " | where str0 LIKE 'OFF%' | fields str0",
206+
6
207+
);
208+
}
209+
210+
public void testWhereLikeUnderscoreWildcard() throws IOException {
211+
// 'on_' matches 'one' only (3 chars starting with "on").
212+
assertRows(
213+
"source=" + DATASET.indexName + " | where str2 LIKE 'on_' | fields str2",
214+
row("one")
215+
);
216+
}
217+
218+
public void testWhereLikeNoMatch() throws IOException {
219+
assertRowCount(
220+
"source=" + DATASET.indexName + " | where like(str0, 'XYZ%') | fields str0",
221+
0
222+
);
223+
}
224+
225+
// ── CONTAINS (lowers to ILIKE — case-insensitive) ───────────────────────
226+
227+
public void testWhereContains() throws IOException {
228+
// 'URN' inside FURNITURE → 2 rows.
229+
assertRowCount(
230+
"source=" + DATASET.indexName + " | where str0 contains 'URN' | fields str0",
231+
2
232+
);
233+
}
234+
235+
public void testWhereContainsCaseInsensitive() throws IOException {
236+
// Lowercase pattern still hits FURNITURE because contains uses ILIKE.
237+
assertRowCount(
238+
"source=" + DATASET.indexName + " | where str0 contains 'urn' | fields str0",
239+
2
240+
);
241+
}
242+
243+
// ── Sub-expression scalar calls (pass through to DataFusion) ────────────
244+
245+
public void testWhereInnerLength() throws IOException {
246+
// length('FURNITURE') = 9 → 2 rows.
247+
assertRowCount(
248+
"source=" + DATASET.indexName + " | where length(str0) = 9 | fields str0",
249+
2
250+
);
251+
}
252+
253+
public void testWhereInnerAbs() throws IOException {
254+
// abs(num0) > 10 → {-15.7, -12.3, 12.3, 15.7} = 4 rows.
255+
assertRowCount(
256+
"source=" + DATASET.indexName + " | where abs(num0) > 10 | fields num0",
257+
4
258+
);
259+
}
260+
261+
public void testWhereInnerArithmetic() throws IOException {
262+
// num0 + 100 > 105 ⇔ num0 > 5 → {12.3, 15.7, 10} = 3 rows.
263+
assertRowCount(
264+
"source=" + DATASET.indexName + " | where num0 + 100 > 105 | fields num0",
265+
3
266+
);
267+
}
268+
269+
// ── Helpers ─────────────────────────────────────────────────────────────
270+
271+
private static List<Object> row(Object... values) {
272+
return Arrays.asList(values);
273+
}
274+
275+
/**
276+
* Assert that the PPL query returns exactly {@code expectedCount} rows. Used when the
277+
* exact row contents would be brittle (e.g. set membership tests where row order is not
278+
* guaranteed by the engine).
279+
*/
280+
private void assertRowCount(String ppl, int expectedCount) throws IOException {
281+
Map<String, Object> response = executePpl(ppl);
282+
@SuppressWarnings("unchecked")
283+
List<List<Object>> actualRows = (List<List<Object>>) response.get("rows");
284+
assertNotNull("Response missing 'rows' field for query: " + ppl, actualRows);
285+
assertEquals(
286+
"Row count mismatch for query: " + ppl + " — got rows: " + actualRows,
287+
expectedCount,
288+
actualRows.size()
289+
);
290+
}
291+
292+
/**
293+
* Assert exact row contents. Mirrors {@link FillNullCommandIT#assertRows} including the
294+
* numeric-tolerant cell comparator (Jackson parsing returns Integer/Long/Double per JSON
295+
* shape, but PPL doesn't preserve that distinction at the API surface).
296+
*/
297+
@SafeVarargs
298+
@SuppressWarnings("varargs")
299+
private final void assertRows(String ppl, List<Object>... expected) throws IOException {
300+
Map<String, Object> response = executePpl(ppl);
301+
@SuppressWarnings("unchecked")
302+
List<List<Object>> actualRows = (List<List<Object>>) response.get("rows");
303+
assertNotNull("Response missing 'rows' field for query: " + ppl, actualRows);
304+
assertEquals("Row count mismatch for query: " + ppl, expected.length, actualRows.size());
305+
for (int i = 0; i < expected.length; i++) {
306+
List<Object> want = expected[i];
307+
List<Object> got = actualRows.get(i);
308+
assertEquals(
309+
"Column count mismatch at row " + i + " for query: " + ppl,
310+
want.size(),
311+
got.size()
312+
);
313+
for (int j = 0; j < want.size(); j++) {
314+
assertCellEquals(
315+
"Cell mismatch at row " + i + ", col " + j + " for query: " + ppl,
316+
want.get(j),
317+
got.get(j)
318+
);
319+
}
320+
}
321+
}
322+
323+
private Map<String, Object> executePpl(String ppl) throws IOException {
324+
ensureDataProvisioned();
325+
Request request = new Request("POST", "/_analytics/ppl");
326+
request.setJsonEntity("{\"query\": \"" + escapeJson(ppl) + "\"}");
327+
Response response = client().performRequest(request);
328+
return assertOkAndParse(response, "PPL: " + ppl);
329+
}
330+
331+
private static void assertCellEquals(String message, Object expected, Object actual) {
332+
if (expected == null || actual == null) {
333+
assertEquals(message, expected, actual);
334+
return;
335+
}
336+
if (expected instanceof Number && actual instanceof Number) {
337+
double e = ((Number) expected).doubleValue();
338+
double a = ((Number) actual).doubleValue();
339+
if (Double.compare(e, a) != 0) {
340+
fail(message + ": expected <" + expected + "> but was <" + actual + ">");
341+
}
342+
return;
343+
}
344+
assertEquals(message, expected, actual);
345+
}
346+
}

0 commit comments

Comments
 (0)