diff --git a/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java b/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java index ddfe5fd3556..d802c011a48 100644 --- a/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java +++ b/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java @@ -10,9 +10,17 @@ import java.util.List; import java.util.Map; import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.hep.HepPlanner; +import org.apache.calcite.plan.hep.HepProgram; +import org.apache.calcite.plan.hep.HepProgramBuilder; +import org.apache.calcite.rel.RelHomogeneousShuttle; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.rules.CoreRules; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexShuttle; +import org.apache.calcite.rex.RexSubQuery; +import org.apache.calcite.sql2rel.RelDecorrelator; import org.opensearch.analytics.exec.QueryPlanExecutor; import org.opensearch.core.action.ActionListener; import org.opensearch.sql.ast.statement.ExplainMode; @@ -77,15 +85,16 @@ public void execute( // taken by SimpleJsonResponseFormatter sees the correct value. ProfileMetric execMetric = QueryProfiling.current().getOrCreateMetric(MetricName.EXECUTE); long execStart = System.nanoTime(); + final RelNode executablePlan = removeSubQueries(plan, context); planExecutor.execute( - plan, + executablePlan, null, new ActionListener<>() { @Override public void onResponse(Iterable rows) { try { - List fields = plan.getRowType().getFieldList(); + List fields = executablePlan.getRowType().getFieldList(); List results = convertRows(rows, fields); Schema schema = buildSchema(fields); execMetric.set(System.nanoTime() - execStart); @@ -109,6 +118,7 @@ public void explain( CalcitePlanContext context, ResponseListener listener) { try { + plan = removeSubQueries(plan, context); String logical = RelOptUtil.toString(plan, mode.toExplainLevel()); ExplainResponse response = new ExplainResponse(new ExplainResponseNodeV2(logical, null, null)); @@ -148,4 +158,60 @@ private ExprType convertType(RelDataType type) { return org.opensearch.sql.data.type.ExprCoreType.UNKNOWN; } } + + /** + * Converts every {@link org.apache.calcite.rex.RexSubQuery} in the plan to a {@code + * LogicalCorrelate}, then decorrelates back to standard join shapes (LEFT_SEMI / LEFT_ANTI / + * INNER). The analytics-engine planner has no RexSubQuery handler — it routes filter / project + * expressions through a {@code ScalarFunction} table that doesn't include EXISTS / IN / SOME / + * ANY operators, so an un-removed subquery surfaces as {@code "Unrecognized filter operator + * [EXISTS / EXISTS]"} at the filter rule. The legacy engine path picks this up implicitly inside + * Calcite's {@code RelRunner.prepareStatement}, which is skipped on the analytics route. + * + *

Scoped to the analytics path only — placing this on {@link AnalyticsExecutionEngine} (rather + * than on the central {@code UnifiedQueryPlanner}) keeps the legacy path's RelNode pipeline + * untouched. + */ + private static final HepProgram SUBQUERY_REMOVE_PROGRAM = + new HepProgramBuilder() + .addRuleInstance(CoreRules.FILTER_SUB_QUERY_TO_CORRELATE) + .addRuleInstance(CoreRules.PROJECT_SUB_QUERY_TO_CORRELATE) + .addRuleInstance(CoreRules.JOIN_SUB_QUERY_TO_CORRELATE) + .build(); + + private static RelNode removeSubQueries(RelNode plan, CalcitePlanContext context) { + // Fast-path: skip HEP + decorrelation entirely if the plan has no RexSubQuery. Avoids HEP + // overhead for the typical query and keeps the rewrite a no-op for callers (incl. unit tests) + // whose RelNodes don't have a real RelOptCluster wired up. + if (!containsSubQuery(plan)) { + return plan; + } + HepPlanner planner = new HepPlanner(SUBQUERY_REMOVE_PROGRAM); + planner.setRoot(plan); + RelNode withCorrelates = planner.findBestExp(); + return RelDecorrelator.decorrelateQuery(withCorrelates, context.relBuilder); + } + + /** Walks {@code plan} and returns {@code true} as soon as any {@link RexSubQuery} is found. */ + static boolean containsSubQuery(RelNode plan) { + boolean[] found = {false}; + RexShuttle rexFinder = + new RexShuttle() { + @Override + public org.apache.calcite.rex.RexNode visitSubQuery(RexSubQuery sub) { + found[0] = true; + return sub; + } + }; + plan.accept( + new RelHomogeneousShuttle() { + @Override + public RelNode visit(RelNode node) { + if (found[0]) return node; + node.accept(rexFinder); + return found[0] ? node : super.visit(node); + } + }); + return found[0]; + } } diff --git a/core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java b/core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java index 4de596fb375..9fcd7fa1d14 100644 --- a/core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java +++ b/core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java @@ -6,6 +6,7 @@ package org.opensearch.sql.executor.analytics; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -20,9 +21,12 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelShuttle; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.rex.RexShuttle; +import org.apache.calcite.rex.RexSubQuery; import org.apache.calcite.sql.type.SqlTypeFactoryImpl; import org.apache.calcite.sql.type.SqlTypeName; import org.junit.jupiter.api.BeforeEach; @@ -276,6 +280,55 @@ void physicalPlanExplain_callsOnFailure() { + errorRef.get().getMessage()); } + // --- Subquery-removal discriminator (see {@link AnalyticsExecutionEngine#containsSubQuery}) --- + // + // {@code containsSubQuery} gates the SubQueryRemoveRule + RelDecorrelator chain we run inside + // {@code execute()} / {@code explain()}. The chain is Calcite-tested, but the *discriminator* + // is plugin code; if it falsely returns true for a vanilla mock RelNode the existing + // execute-path tests in this class break with "NullPointerException: cluster" inside HepPlanner. + // If it falsely returns false on a plan that does contain a RexSubQuery, the EXISTS-subquery + // regression we shipped this fix for would silently reappear. + + @Test + void containsSubQuery_falseForPlanWithoutSubQuery() { + // A plain mocked RelNode has no RexNodes — Mockito default returns null from accept(...), + // which keeps the shuttle from ever reaching visitSubQuery. + RelNode plan = mock(RelNode.class); + + assertFalse( + AnalyticsExecutionEngine.containsSubQuery(plan), + "containsSubQuery should report false on a plan with no RexSubQuery — otherwise the" + + " HepPlanner / RelDecorrelator path would fire on plans that have no subqueries to" + + " remove (and on tests that pass mocked RelNodes without a real cluster)."); + } + + @Test + void containsSubQuery_trueWhenRelNodeExposesRexSubQueryDuringTraversal() { + // Stub the RelNode.accept(RexShuttle) call to feed a RexSubQuery into the shuttle, mirroring + // what a real LogicalFilter(condition=[EXISTS(subq)]) would do during traversal. + RelNode plan = mock(RelNode.class); + RexSubQuery subQuery = mock(RexSubQuery.class); + when(plan.accept(any(RelShuttle.class))) + .thenAnswer( + inv -> { + RelShuttle shuttle = inv.getArgument(0); + return shuttle.visit(plan); + }); + when(plan.accept(any(RexShuttle.class))) + .thenAnswer( + inv -> { + RexShuttle rex = inv.getArgument(0); + rex.visitSubQuery(subQuery); + return null; + }); + + assertTrue( + AnalyticsExecutionEngine.containsSubQuery(plan), + "containsSubQuery should detect a RexSubQuery surfaced through the node's" + + " accept(RexShuttle) — the discriminator is what gates the EXISTS/IN/SOME/ANY" + + " removal pass."); + } + // --- helpers --- private QueryResponse executeAndCapture(RelNode relNode) {