Skip to content

Commit b6c4b9d

Browse files
authored
[Analytics Backend / DataFusion] Substrait Plan.Root.names + CASE + untyped-NULL fixes for multisearch (opensearch-project#21528)
* [Analytics Backend / DataFusion] Fix Plan.Root.names mismatch for schema-reshaping wrappers `DataFusionFragmentConvertor.rewire` always populated the new `Plan.Root.names` list with the *inner* plan's names. For schema-preserving wrappers (Sort, Filter, Fetch) those happen to coincide with the wrapper's output schema, so the bug was hidden. For schema-reshaping wrappers (Aggregate, Project) the wrapper's output width differs from the inner's, and DataFusion's substrait consumer rejects the plan in `make_renamed_schema` with: Substrait error: Names list must match exactly to nested schema, but found {wrapper-width} uses for {inner-width} names This shape is hit by every PPL `multisearch` query whose coordinator stage is `Sort(Aggregate(Union(StageInputScan, StageInputScan)))` — the Aggregate narrows the wide Union row type, and the inner-names override surfaced the mismatch as a 500. Fix: derive the new `Plan.Root.names` from the wrapper RelNode's row type (`fragment.getRowType().getFieldList()`), not the inner plan. Both `attachFragmentOnTop` and `attachPartialAggOnTop` already have the wrapper RelNode in scope, so this is a local change with no signature ripple beyond adding a `List<String> wrapperNames` parameter to `rewire`. Test coverage: - `testAttachPartialAggOnTop_PlanRootNamesMatchWrapperOutput` — the partial-agg path with a 3-column inner scan and a 1-column wrapper aggregate; pins names to the wrapper's output. - `testAttachFragmentOnTop_AggregateOverMultiColumnInner_PlanRootNamesMatchWrapperOutput` — the multisearch coordinator-stage shape (`Aggregate(Union)`). - `testMultisearchShape_SortOverAggregateOverThreeWayUnion_PlanRootNamesMatchTopOutput` — full chain `Sort → Aggregate → Union(Sin × 3)` modeling the `testMultisearchWithThreeSubsearches` query plan. - `testMultisearchShape_SystemLimitOverSortOverAggregateOverUnion_NamesMatchTopOutput` — adds the implicit `LogicalSystemLimit` wrapper that `QueryService.convertToCalcitePlan` injects at the top of every analytics-engine plan, lowered to a Substrait `Fetch`. End-to-end validation against `:integ-test:integTestRemote --tests 'org.opensearch.sql.calcite.remote.CalciteMultisearchCommandIT'` with `-Dtests.analytics.force_routing=true -Dtests.analytics.parquet_indices=true`: 21 tests, **5 pass** (was 0/21 before this change). The remaining 16 are blocked on orthogonal issues that are out of scope here: - `Field [...] not found.` analyzer errors (8 tests) — pre-existing parquet-backed-index field-resolution gap. - TIMESTAMP / SPAN scalar functions unsupported (4 tests). - AssertionError on error-message format (2 tests). - One residual `Names list must match exactly to nested schema, but found 2 uses for 6 names` on `testMultisearchWithThreeSubsearches` — likely a different code path (PARTIAL/FINAL split via `OpenSearchAggregateSplitRule`) that the convertor unit tests don't exercise; tracked for follow-up. - Two timeouts/long-running on the largest queries. Signed-off-by: Kai Huang <ahkcs@amazon.com> * [QA] Add MultisearchCommandIT for the analytics-engine REST path Lands a self-contained QA IT covering PPL `multisearch` so the analytics-route fix in this PR is exercised inside core without cross-plugin dependencies on the SQL plugin. Three tests, scoped to the surface analytics already supports end-to-end: | Test | Shape | |---|---| | `testMultisearchTwoBranchesByCategory` | Basic 2-way Union over int0 buckets — `Union(Filter+Eval+Project, Filter+Eval+Project)` followed by `Aggregate(count by) | sort`. Exercises the same convertReduceFragment chain (`attachFragmentOnTop(Sort, attachFragmentOnTop(Aggregate, convertFinalAggFragment(Union)))`) that the rewire fix targets. | | `testMultisearchThreeBranchesByStr0` | 3-way Union — the exact `Union(ER, ER, ER)` shape that surfaced the residual "2 uses for 6 names" failure I'd flagged as a follow-up in an earlier draft of the PR description; the rewire fix already covers it on a fresh cluster. | | `testMultisearchSingleSubsearchRejected` | Arity check — pinned at the parser layer (AstBuilder.visitMultisearchCommand rejects <2 subsearches with `SyntaxCheckException`). Regression-pin against accidental relaxation of that guard. | Each branch projects to a scalar-only field set (`fields int0, class` / `fields str0, bucket`) so the union row type sidesteps the calcs dataset's date/time/datetime columns — `ArrowSchemaFromCalcite.toArrowType` doesn't yet handle TIMESTAMP, tracked separately. Bumps `test-ppl-frontend`'s `unified-query-*` dependency from 3.6.0.0-SNAPSHOT to 3.7.0.0-SNAPSHOT so the bundled PPL grammar exposes the `multisearch` keyword (along with table/regex/rex/convert added since 3.6). The SQL Snapshots repo (already declared in the build) carries the published 3.7 artifacts; for local sql-repo HEAD development, run `./gradlew :ppl:publishUnifiedQueryPublicationToMavenLocal` from the sql repo. The version bump is independent of the in-flight test-ppl-frontend UnifiedQueryService.setting() change in opensearch-project#21526 — different files, no conflict. Validates: 3/3 MultisearchCommandIT pass; full `:sandbox:qa:analytics-engine-rest:integTest` suite still green (110 tests across 14 ITs). Signed-off-by: Kai Huang <ahkcs@amazon.com> * [Analytics Backend / DataFusion] Spotless reformat of testFinalAggInnerStageScanRowType Single-line spotless reformat in DataFusionFragmentConvertorTests: the OpenSearchStageInputScan constructor call now fits on one line (it was previously broken across multiple lines). Originally bumped sandbox/plugins/analytics-backend-datafusion's sqlUnifiedQueryVersion 3.6 -> 3.7 to align with test-ppl-frontend, but the entire internalClusterTest classpath block (including that pin) was removed upstream by opensearch-project#21555 (dbe4a42, "Enable Lucene Filter delegation from Datafusion for Correctness"). The build.gradle hunk dropped during rebase; only the spotless reformat survives. Signed-off-by: Kai Huang <ahkcs@amazon.com> * [Analytics Backend / DataFusion] Register CASE in project capabilities + QA IT Calcite emits SqlKind.CASE for any conditional expression — explicit `eval x = case(cond, val, …)` in PPL, plus the `count(eval(predicate))` conditional-count idiom (lowered to COUNT(CASE WHEN predicate THEN … END)) and several other shapes. Without CASE in `STANDARD_PROJECT_OPS`, the analytics planner rejected the operator with `No backend supports scalar function [CASE] among [datafusion]` before substrait emission. CASE doesn't need a backend adapter: isthmus translates SqlKind.CASE structurally to a Substrait IfThen rel, and DataFusion's substrait consumer handles IfThen natively. Just registering the capability is enough. Adds `testMultisearchEvalCaseProjection` to MultisearchCommandIT to pin the end-to-end path — multisearch + `eval bucket = case(cond, val else default)` + stats. Uses an explicit `else` arm so isthmus doesn't have to convert an untyped NULL literal; the implicit-else `count(eval(…))` shape that the v2-side testMultisearchSuccessRatePattern uses still hits a separate isthmus limitation (`Unable to convert the type NULL` from `TypeConverter` on a SqlTypeName.NULL literal — tracked separately, out of scope here). Validates: 4/4 MultisearchCommandIT pass; full :sandbox:qa:analytics-engine-rest:integTest suite still green (111 tests across 14 ITs). Signed-off-by: Kai Huang <ahkcs@amazon.com> * [Analytics Backend / DataFusion] Pre-isthmus untyped-NULL rewriter for CASE arms Calcite emits a `RexLiteral` with `SqlTypeName.NULL` for the implicit ELSE arm of `CASE WHEN cond THEN val END` — exactly the shape PPL `count(eval(predicate))` lowers to (`COUNT(CASE WHEN predicate THEN <projected> END)`). Isthmus' `TypeConverter.toSubstrait` rejects `SqlTypeName.NULL` with `Unable to convert the type NULL`, blocking the analytics path before substrait emission. Adds `UntypedNullPreprocessor` — a `RelHomogeneousShuttle` + `RexShuttle` pass applied in `convertToSubstrait` and `convertStandalone` *before* the SubstraitRelVisitor sees the plan. Walks every CASE call's value operands (THEN arms and the ELSE arm) and substitutes any `SqlTypeName.NULL` literal with a typed null literal matching the CASE's resolved return type. Calcite already widens the CASE's return type to the leastRestrictive of branches, so the substituted type is correct by construction. Scope is intentionally narrow: only CASE call operands are rewritten today. Other untyped-NULL contexts (function arguments, comparison RHS) are rare in PPL-generated plans and would need per-operator type inference to do safely; defer until a concrete test surfaces one. Test coverage: - `UntypedNullPreprocessorTests` (4 new): * `testCountEvalCaseRewritesElseNullToTypedNull` — the motivating shape `COUNT(CASE WHEN cond THEN 1 ELSE null END)`. * `testCaseWithThenNullIsAlsoRewritten` — null in the THEN arm. * `testCaseConditionOperandUnchanged` — even-index condition operands left alone. * `testCountOverRewrittenCaseProjectionTypechecks` — Aggregate(Project(CASE)) with the rewriter applied still type-checks end-to-end. - New `testMultisearchCountEvalConditionalCount` in MultisearchCommandIT — mirrors the v2-side `CalciteMultisearchCommandIT.testMultisearchSuccessRatePattern` shape (`count(eval(predicate))`) end-to-end on the analytics-engine REST path. Validates: 5/5 MultisearchCommandIT pass; 4/4 new + 12/12 existing FragmentConvertor unit tests; full :sandbox:qa:analytics-engine-rest:integTest suite still green (112 tests across 14 ITs). Signed-off-by: Kai Huang <ahkcs@amazon.com> * [Analytics Backend / DataFusion] Simplify testCaseConditionOperandUnchanged for spotless The original assertion wrapped a no-op single-iteration loop around an empty- body RexShuttle whose accept() result is just caseExpr.toString(). Spotless flagged the empty class body (`new RexShuttle() {}`) as a formatting violation and tried to wrap it across two lines, which read worse than the underlying intent — comparing the input CASE expression to the rewriter's output to prove no-op behavior when no untyped nulls are present. Replace the loop+shuttle with a direct `assertEquals(caseExpr.toString(), rewrittenCase.toString())` — same semantics, cleaner code, no awkward formatting. The test still asserts the rewriter doesn't touch CASE expressions whose operands are already typed. Sandbox check (`./gradlew check -p sandbox -Dsandbox.enabled=true`) now passes end-to-end. Signed-off-by: Kai Huang <ahkcs@amazon.com> --------- Signed-off-by: Kai Huang <ahkcs@amazon.com>
1 parent 8560342 commit b6c4b9d

7 files changed

Lines changed: 768 additions & 12 deletions

File tree

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionAnalyticsBackendPlugin.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,14 @@ public class DataFusionAnalyticsBackendPlugin implements AnalyticsSearchBackendP
101101
ScalarFunction.CAST,
102102
ScalarFunction.CONCAT,
103103
ScalarFunction.SAFE_CAST,
104+
// CASE — Calcite emits CASE WHEN ... THEN ... END for conditional expressions, including
105+
// PPL `count(eval(predicate))` (lowered to COUNT(CASE WHEN predicate THEN ... ELSE NULL END))
106+
// and explicit `eval x = case(cond, val, ...)`. Isthmus translates SqlKind.CASE structurally
107+
// to a Substrait IfThen rel — no extension lookup needed, no adapter required. DataFusion's
108+
// substrait consumer handles IfThen natively. Without this entry, the analytics planner
109+
// rejects the operator with "No backend supports scalar function [CASE] among [datafusion]"
110+
// before substrait emission.
111+
ScalarFunction.CASE,
104112
// ABS / SUBSTRING — `eval x = abs(...)` and `eval s = substring(...)` projections that PPL
105113
// sort-pushdown moves into the project tree (see CalciteSortCommandIT
106114
// testPushdownSortExpressionContainsNull and CalcitePPLSortIT

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionFragmentConvertor.java

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@
7070
* <li>{@link #attachPartialAggOnTop(RelNode, byte[])} and
7171
* {@link #attachFragmentOnTop(RelNode, byte[])} — convert the wrapping
7272
* operator standalone, then rewire its input to the decoded inner plan's
73-
* root via {@link #rewire(Plan, Rel)}.</li>
73+
* root via {@link #rewire(Plan, Rel, List)}.</li>
7474
* </ul>
7575
*
7676
* @opensearch.internal
@@ -126,7 +126,11 @@ public byte[] attachPartialAggOnTop(RelNode partialAggFragment, byte[] innerByte
126126
LOGGER.debug("Attaching partial aggregate on top of {} inner bytes", innerBytes.length);
127127
Plan inner = decodePlan(innerBytes);
128128
Rel wrapper = convertStandalone(partialAggFragment);
129-
Plan rewired = rewire(inner, withAggregationPhase(wrapper, Expression.AggregationPhase.INITIAL_TO_INTERMEDIATE));
129+
Plan rewired = rewire(
130+
inner,
131+
withAggregationPhase(wrapper, Expression.AggregationPhase.INITIAL_TO_INTERMEDIATE),
132+
fieldNames(partialAggFragment)
133+
);
130134
return serializePlan(rewired);
131135
}
132136

@@ -150,14 +154,19 @@ public byte[] attachFragmentOnTop(RelNode fragment, byte[] innerBytes) {
150154
// the visitor still walks them top-down to build the wrapper rel.
151155
RelNode rewritten = rewriteStageInputScans(fragment);
152156
Rel wrapper = convertStandalone(rewritten);
153-
return serializePlan(rewire(inner, wrapper));
157+
return serializePlan(rewire(inner, wrapper, fieldNames(fragment)));
154158
}
155159

156160
// ── Core conversion helpers ─────────────────────────────────────────────────
157161

158162
private byte[] convertToSubstrait(RelNode fragment) {
159-
RelRoot root = RelRoot.of(fragment, SqlKind.SELECT);
160-
SubstraitRelVisitor visitor = createVisitor(fragment);
163+
// Rewrite SqlTypeName.NULL literals (Calcite's untyped null, emitted for the
164+
// implicit ELSE arm of CASE) to typed nulls — isthmus' TypeConverter rejects NULL
165+
// with "Unable to convert the type NULL". The widening only changes literal type
166+
// tags; semantics and field names (used by Plan.Root.names) are unchanged.
167+
RelNode preprocessed = UntypedNullPreprocessor.rewrite(fragment);
168+
RelRoot root = RelRoot.of(preprocessed, SqlKind.SELECT);
169+
SubstraitRelVisitor visitor = createVisitor(preprocessed);
161170
Rel substraitRel = visitor.apply(root.rel);
162171

163172
List<String> fieldNames = root.fields.stream().map(field -> field.getValue()).toList();
@@ -178,11 +187,15 @@ private byte[] convertToSubstrait(RelNode fragment) {
178187
* children (e.g. the {@code attachPartialAggOnTop} caller passes a
179188
* {@code LogicalAggregate} whose input is the already-stripped inner tree); we
180189
* deliberately discard those children by taking only the outermost rel of the
181-
* conversion and rewiring its input during {@link #rewire(Plan, Rel)}.
190+
* conversion and rewiring its input during {@link #rewire(Plan, Rel, List)}.
182191
*/
183192
private Rel convertStandalone(RelNode operator) {
184-
SubstraitRelVisitor visitor = createVisitor(operator);
185-
return visitor.apply(operator);
193+
// Same untyped-NULL preprocessing rationale as convertToSubstrait — the standalone
194+
// wrapper conversion is just as susceptible to a SqlTypeName.NULL literal lurking in
195+
// a CASE call attached on top of an inner plan.
196+
RelNode preprocessed = UntypedNullPreprocessor.rewrite(operator);
197+
SubstraitRelVisitor visitor = createVisitor(preprocessed);
198+
return visitor.apply(preprocessed);
186199
}
187200

188201
/**
@@ -191,15 +204,28 @@ private Rel convertStandalone(RelNode operator) {
191204
* {@code wrapper(inner.root)}. Supports the known single-input wrappers emitted
192205
* by our four SPI methods ({@link Aggregate}, {@link Sort}, {@link Filter},
193206
* {@link Project}).
207+
*
208+
* <p>{@code wrapperNames} must be the wrapper's output column names — typically
209+
* derived from the wrapper {@link RelNode}'s row type. For schema-preserving
210+
* wrappers (Sort, Filter, Fetch) these match the inner plan's names; for
211+
* schema-reshaping wrappers (Aggregate, Project) they don't, and using the
212+
* inner's names there causes DataFusion's substrait consumer to reject the
213+
* Plan with a "Names list must match exactly to nested schema" error in
214+
* {@code make_renamed_schema}.
194215
*/
195-
static Plan rewire(Plan inner, Rel wrapper) {
216+
static Plan rewire(Plan inner, Rel wrapper, List<String> wrapperNames) {
196217
if (inner.getRoots().isEmpty()) {
197218
throw new IllegalArgumentException("Inner Substrait plan has no root relation to rewire under wrapper");
198219
}
199220
Plan.Root innerRoot = inner.getRoots().get(0);
200221
Rel innerRel = innerRoot.getInput();
201222
Rel rewired = replaceInput(wrapper, innerRel);
202-
return Plan.builder().addRoots(Plan.Root.builder().input(rewired).names(innerRoot.getNames()).build()).build();
223+
return Plan.builder().addRoots(Plan.Root.builder().input(rewired).names(wrapperNames).build()).build();
224+
}
225+
226+
/** Extracts a wrapper's output column names from its Calcite row type. */
227+
private static List<String> fieldNames(RelNode fragment) {
228+
return fragment.getRowType().getFieldList().stream().map(RelDataTypeField::getName).toList();
203229
}
204230

205231
private static Rel replaceInput(Rel wrapper, Rel newInput) {
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.be.datafusion;
10+
11+
import org.apache.calcite.rel.RelHomogeneousShuttle;
12+
import org.apache.calcite.rel.RelNode;
13+
import org.apache.calcite.rex.RexBuilder;
14+
import org.apache.calcite.rex.RexCall;
15+
import org.apache.calcite.rex.RexLiteral;
16+
import org.apache.calcite.rex.RexNode;
17+
import org.apache.calcite.rex.RexShuttle;
18+
import org.apache.calcite.sql.SqlKind;
19+
import org.apache.calcite.sql.type.SqlTypeName;
20+
21+
import java.util.ArrayList;
22+
import java.util.List;
23+
24+
/**
25+
* Pre-isthmus pass that rewrites untyped {@code NULL} literals
26+
* ({@code RexLiteral} with {@link SqlTypeName#NULL}) to typed null literals
27+
* inferred from their enclosing operator.
28+
*
29+
* <p>Calcite emits an untyped NULL for the implicit {@code ELSE} arm of
30+
* {@code CASE WHEN cond THEN val END}, which is exactly the shape PPL
31+
* {@code count(eval(predicate))} lowers to:
32+
*
33+
* <pre>{@code
34+
* COUNT(CASE WHEN predicate THEN <projected> END) // ELSE is implicit NULL, type=NULL
35+
* }</pre>
36+
*
37+
* <p>Isthmus' {@code TypeConverter.toSubstrait} rejects {@link SqlTypeName#NULL}
38+
* with {@code "Unable to convert the type NULL"}. The CASE call's resolved
39+
* return type already carries the right widened type ({@code NULLABLE BIGINT}
40+
* for the count-eval shape, etc), so we substitute that.
41+
*
42+
* <p>Scope: only CASE call operands are rewritten today. Other untyped-NULL
43+
* sites (function arguments, comparison RHS, etc) are rare in PPL-generated
44+
* plans and would need per-operator type-inference to do safely; defer until
45+
* a concrete test surfaces one.
46+
*
47+
* @opensearch.internal
48+
*/
49+
final class UntypedNullPreprocessor {
50+
51+
private UntypedNullPreprocessor() {}
52+
53+
/**
54+
* Walk the RelNode tree, applying the rewrite to every node's expressions.
55+
* Returns a new tree if any rewrite occurred, otherwise the input unchanged.
56+
*/
57+
static RelNode rewrite(RelNode root) {
58+
return root.accept(new RelHomogeneousShuttle() {
59+
@Override
60+
public RelNode visit(RelNode other) {
61+
RelNode visited = super.visit(other);
62+
return visited.accept(new CaseUntypedNullShuttle(visited.getCluster().getRexBuilder()));
63+
}
64+
});
65+
}
66+
67+
/**
68+
* Per-node rex shuttle: for every {@code CASE} call encountered, rewrite any
69+
* {@link SqlTypeName#NULL}-typed literal operand into a typed null literal
70+
* matching the CASE's resolved return type.
71+
*/
72+
private static final class CaseUntypedNullShuttle extends RexShuttle {
73+
74+
private final RexBuilder rexBuilder;
75+
76+
CaseUntypedNullShuttle(RexBuilder rexBuilder) {
77+
this.rexBuilder = rexBuilder;
78+
}
79+
80+
@Override
81+
public RexNode visitCall(RexCall call) {
82+
// Recurse first so nested CASE calls are rewritten bottom-up — each inner CASE is
83+
// resolved against its own return type, so by the time we look at the outer one,
84+
// every operand is already typed.
85+
RexCall recursed = (RexCall) super.visitCall(call);
86+
if (recursed.getKind() != SqlKind.CASE) {
87+
return recursed;
88+
}
89+
List<RexNode> operands = recursed.getOperands();
90+
List<RexNode> rewritten = new ArrayList<>(operands.size());
91+
boolean changed = false;
92+
for (int i = 0; i < operands.size(); i++) {
93+
RexNode op = operands.get(i);
94+
if (isCaseValueOperand(i, operands.size()) && isUntypedNullLiteral(op)) {
95+
rewritten.add(rexBuilder.makeNullLiteral(recursed.getType()));
96+
changed = true;
97+
} else {
98+
rewritten.add(op);
99+
}
100+
}
101+
return changed ? recursed.clone(recursed.getType(), rewritten) : recursed;
102+
}
103+
104+
/**
105+
* Calcite's CASE operand layout is {@code [cond1, val1, cond2, val2, …, condN, valN, else]}.
106+
* Conditions sit at even indices except the last operand (the ELSE), which is always
107+
* a value. Returns true for value operands (the THEN/ELSE arms).
108+
*/
109+
private static boolean isCaseValueOperand(int index, int total) {
110+
return (index % 2 == 1) || (index == total - 1);
111+
}
112+
113+
private static boolean isUntypedNullLiteral(RexNode node) {
114+
if (!(node instanceof RexLiteral lit)) {
115+
return false;
116+
}
117+
return lit.isNull() && lit.getType().getSqlTypeName() == SqlTypeName.NULL;
118+
}
119+
}
120+
}

0 commit comments

Comments
 (0)