Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,17 @@
import java.util.List;
import java.util.Map;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.hep.HepPlanner;
import org.apache.calcite.plan.hep.HepProgram;
import org.apache.calcite.plan.hep.HepProgramBuilder;
import org.apache.calcite.rel.RelHomogeneousShuttle;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.rules.CoreRules;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexShuttle;
import org.apache.calcite.rex.RexSubQuery;
import org.apache.calcite.sql2rel.RelDecorrelator;
import org.opensearch.analytics.exec.QueryPlanExecutor;
import org.opensearch.core.action.ActionListener;
import org.opensearch.sql.ast.statement.ExplainMode;
Expand Down Expand Up @@ -77,15 +85,16 @@ public void execute(
// taken by SimpleJsonResponseFormatter sees the correct value.
ProfileMetric execMetric = QueryProfiling.current().getOrCreateMetric(MetricName.EXECUTE);
long execStart = System.nanoTime();
final RelNode executablePlan = removeSubQueries(plan, context);

planExecutor.execute(
plan,
executablePlan,
null,
new ActionListener<>() {
@Override
public void onResponse(Iterable<Object[]> rows) {
try {
List<RelDataTypeField> fields = plan.getRowType().getFieldList();
List<RelDataTypeField> fields = executablePlan.getRowType().getFieldList();
List<ExprValue> results = convertRows(rows, fields);
Schema schema = buildSchema(fields);
execMetric.set(System.nanoTime() - execStart);
Expand All @@ -109,6 +118,7 @@ public void explain(
CalcitePlanContext context,
ResponseListener<ExplainResponse> listener) {
try {
plan = removeSubQueries(plan, context);
String logical = RelOptUtil.toString(plan, mode.toExplainLevel());
ExplainResponse response =
new ExplainResponse(new ExplainResponseNodeV2(logical, null, null));
Expand Down Expand Up @@ -148,4 +158,60 @@ private ExprType convertType(RelDataType type) {
return org.opensearch.sql.data.type.ExprCoreType.UNKNOWN;
}
}

/**
* Converts every {@link org.apache.calcite.rex.RexSubQuery} in the plan to a {@code
* LogicalCorrelate}, then decorrelates back to standard join shapes (LEFT_SEMI / LEFT_ANTI /
* INNER). The analytics-engine planner has no RexSubQuery handler — it routes filter / project
* expressions through a {@code ScalarFunction} table that doesn't include EXISTS / IN / SOME /
* ANY operators, so an un-removed subquery surfaces as {@code "Unrecognized filter operator
* [EXISTS / EXISTS]"} at the filter rule. The legacy engine path picks this up implicitly inside
* Calcite's {@code RelRunner.prepareStatement}, which is skipped on the analytics route.
*
* <p>Scoped to the analytics path only — placing this on {@link AnalyticsExecutionEngine} (rather
* than on the central {@code UnifiedQueryPlanner}) keeps the legacy path's RelNode pipeline
* untouched.
*/
private static final HepProgram SUBQUERY_REMOVE_PROGRAM =
new HepProgramBuilder()
.addRuleInstance(CoreRules.FILTER_SUB_QUERY_TO_CORRELATE)
.addRuleInstance(CoreRules.PROJECT_SUB_QUERY_TO_CORRELATE)
.addRuleInstance(CoreRules.JOIN_SUB_QUERY_TO_CORRELATE)
.build();

private static RelNode removeSubQueries(RelNode plan, CalcitePlanContext context) {
// Fast-path: skip HEP + decorrelation entirely if the plan has no RexSubQuery. Avoids HEP
// overhead for the typical query and keeps the rewrite a no-op for callers (incl. unit tests)
// whose RelNodes don't have a real RelOptCluster wired up.
if (!containsSubQuery(plan)) {
return plan;
}
HepPlanner planner = new HepPlanner(SUBQUERY_REMOVE_PROGRAM);
planner.setRoot(plan);
RelNode withCorrelates = planner.findBestExp();
return RelDecorrelator.decorrelateQuery(withCorrelates, context.relBuilder);
}

/** Walks {@code plan} and returns {@code true} as soon as any {@link RexSubQuery} is found. */
static boolean containsSubQuery(RelNode plan) {
boolean[] found = {false};
RexShuttle rexFinder =
new RexShuttle() {
@Override
public org.apache.calcite.rex.RexNode visitSubQuery(RexSubQuery sub) {
found[0] = true;
return sub;
}
};
plan.accept(
new RelHomogeneousShuttle() {
@Override
public RelNode visit(RelNode node) {
if (found[0]) return node;
node.accept(rexFinder);
return found[0] ? node : super.visit(node);
}
});
return found[0];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.sql.executor.analytics;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
Expand All @@ -20,9 +21,12 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelShuttle;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rex.RexShuttle;
import org.apache.calcite.rex.RexSubQuery;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import org.apache.calcite.sql.type.SqlTypeName;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -276,6 +280,55 @@ void physicalPlanExplain_callsOnFailure() {
+ errorRef.get().getMessage());
}

// --- Subquery-removal discriminator (see {@link AnalyticsExecutionEngine#containsSubQuery}) ---
//
// {@code containsSubQuery} gates the SubQueryRemoveRule + RelDecorrelator chain we run inside
// {@code execute()} / {@code explain()}. The chain is Calcite-tested, but the *discriminator*
// is plugin code; if it falsely returns true for a vanilla mock RelNode the existing
// execute-path tests in this class break with "NullPointerException: cluster" inside HepPlanner.
// If it falsely returns false on a plan that does contain a RexSubQuery, the EXISTS-subquery
// regression we shipped this fix for would silently reappear.

@Test
void containsSubQuery_falseForPlanWithoutSubQuery() {
// A plain mocked RelNode has no RexNodes — Mockito default returns null from accept(...),
// which keeps the shuttle from ever reaching visitSubQuery.
RelNode plan = mock(RelNode.class);

assertFalse(
AnalyticsExecutionEngine.containsSubQuery(plan),
"containsSubQuery should report false on a plan with no RexSubQuery — otherwise the"
+ " HepPlanner / RelDecorrelator path would fire on plans that have no subqueries to"
+ " remove (and on tests that pass mocked RelNodes without a real cluster).");
}

@Test
void containsSubQuery_trueWhenRelNodeExposesRexSubQueryDuringTraversal() {
// Stub the RelNode.accept(RexShuttle) call to feed a RexSubQuery into the shuttle, mirroring
// what a real LogicalFilter(condition=[EXISTS(subq)]) would do during traversal.
RelNode plan = mock(RelNode.class);
RexSubQuery subQuery = mock(RexSubQuery.class);
when(plan.accept(any(RelShuttle.class)))
.thenAnswer(
inv -> {
RelShuttle shuttle = inv.getArgument(0);
return shuttle.visit(plan);
});
when(plan.accept(any(RexShuttle.class)))
.thenAnswer(
inv -> {
RexShuttle rex = inv.getArgument(0);
rex.visitSubQuery(subQuery);
return null;
});

assertTrue(
AnalyticsExecutionEngine.containsSubQuery(plan),
"containsSubQuery should detect a RexSubQuery surfaced through the node's"
+ " accept(RexShuttle) — the discriminator is what gates the EXISTS/IN/SOME/ANY"
+ " removal pass.");
}

// --- helpers ---

private QueryResponse executeAndCapture(RelNode relNode) {
Expand Down
Loading