Skip to content

Commit 40cb73b

Browse files
authored
Add explain support for analytics engine path (#5275)
* [Mustang] Add explain support and integration tests for analytics path (#5247) Add explain support for the analytics engine path: - AnalyticsExecutionEngine.explain(RelNode, ExplainMode, ...): returns logical plan via RelOptUtil.toString() with mode-aware SqlExplainLevel (SIMPLE/STANDARD/COST). Physical and extended plans are null since the analytics engine is not yet available. - RestUnifiedQueryAction.explain(): new entry point for explain requests, delegates to AnalyticsExecutionEngine.explain() with ExplainMode.STANDARD. Response formatted via JsonResponseFormatter with normalizeLf(). - TransportPPLQueryAction: routes explain requests for analytics indices to unifiedQueryHandler.explain() instead of unifiedQueryHandler.execute(). Integration tests (AnalyticsPPLIT): - testExplainResponseStructure: verifies JSON shape (calcite.logical, calcite.physical=null, calcite.extended=null) - testExplainProjectAndScan: LogicalProject + LogicalTableScan + table name - testExplainFilterPlan: LogicalFilter with condition value - testExplainAggregationPlan: LogicalAggregate with COUNT() - testExplainSortPlan: LogicalSort Unit tests (AnalyticsExecutionEngineTest): - explainRelNode_returnsLogicalPlan - explainRelNode_errorPropagation Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com> * Add AnalyticsExplainIT with expected output files Add dedicated explain integration test class following the CalciteExplainIT pattern: each test compares actual explain JSON against expected output files in resources/expectedOutput/analytics/. Tests cover simple scan, project, filter+project, aggregation, sort (with collation propagation to LogicalSystemLimit), and eval. Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com> * Fix explain unit test to verify actual logical plan content Use a real Calcite LogicalValues node instead of a mock so RelOptUtil.toString() produces a deterministic plan. Assert the exact expected logical plan string instead of just checking non-null. Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com> * Remove explain unit tests in favor of AnalyticsExplainIT Explain is thoroughly covered by AnalyticsExplainIT with expected output file comparison (6 tests). Remove redundant explain unit tests and unused captureExplainListener helper. Execute unit tests unchanged. Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com> * Remove explain tests from AnalyticsPPLIT in favor of AnalyticsExplainIT Explain is covered by AnalyticsExplainIT with expected output file comparison (6 tests). Remove redundant explain tests and extractLogicalPlan helper from AnalyticsPPLIT. Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com> * Switch explain tests to YAML format with format-aware listener - Add YAML format support to createExplainListener() by checking pplRequest.getFormat(), matching TransportPPLQueryAction pattern - Switch AnalyticsExplainIT from explainQueryToString (JSON) to explainQueryYaml (YAML) with assertYamlEqualsIgnoreId - Replace JSON expected files with YAML expected files - YAML serializer omits null fields (physical/extended), so expected files only contain the logical plan Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com> * Use request explain mode instead of hardcoded STANDARD Pass pplRequest.mode() to analyticsEngine.explain() so the user's ?mode= parameter (simple, standard, cost, extended) is respected. Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com> * Fix normalizeLf inconsistency with existing PPL explain path Only apply normalizeLf() for YAML format, return response directly for JSON format. Matches TransportPPLQueryAction.createExplainResponseListener() which uses normalizeLf for YAML but returns response as-is for JSON. Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com> * Add toExplainLevel() to ExplainMode enum Encapsulate the ExplainMode to SqlExplainLevel mapping in the enum itself instead of repeating ternary logic in each execution engine. AnalyticsExecutionEngine now uses mode.toExplainLevel() directly. Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com> * Remove logging from AnalyticsExplainIT Remove LOG.info calls and Logger field. The YAML comparison assertion already provides clear output on failure. Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com> * Remove explain coverage comments Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com> * Reuse TransportPPLQueryAction.createExplainResponseListener for explain formatting Remove duplicate createExplainListener from RestUnifiedQueryAction. explain() now returns ExplainResponse via ResponseListener, and TransportPPLQueryAction wraps it with its existing createExplainResponseListener for format-aware (JSON/YAML) formatting. This avoids duplicating the format selection logic. Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com> --------- Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com>
1 parent 23227fd commit 40cb73b

11 files changed

Lines changed: 200 additions & 4 deletions

File tree

core/src/main/java/org/opensearch/sql/ast/statement/ExplainMode.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import java.util.Locale;
99
import lombok.Getter;
1010
import lombok.RequiredArgsConstructor;
11+
import org.apache.calcite.sql.SqlExplainLevel;
1112

1213
@RequiredArgsConstructor
1314
public enum ExplainMode {
@@ -26,4 +27,13 @@ public static ExplainMode of(String mode) {
2627
return ExplainMode.STANDARD;
2728
}
2829
}
30+
31+
/** Convert to Calcite SqlExplainLevel for RelOptUtil.toString(). */
32+
public SqlExplainLevel toExplainLevel() {
33+
return switch (this) {
34+
case SIMPLE -> SqlExplainLevel.NO_ATTRIBUTES;
35+
case COST -> SqlExplainLevel.ALL_ATTRIBUTES;
36+
default -> SqlExplainLevel.EXPPLAN_ATTRIBUTES;
37+
};
38+
}
2939
}

core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@
99
import java.util.LinkedHashMap;
1010
import java.util.List;
1111
import java.util.Map;
12+
import org.apache.calcite.plan.RelOptUtil;
1213
import org.apache.calcite.rel.RelNode;
1314
import org.apache.calcite.rel.type.RelDataType;
1415
import org.apache.calcite.rel.type.RelDataTypeField;
16+
import org.opensearch.sql.ast.statement.ExplainMode;
1517
import org.opensearch.sql.calcite.CalcitePlanContext;
1618
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;
1719
import org.opensearch.sql.common.response.ResponseListener;
@@ -77,6 +79,20 @@ public void execute(
7779
}
7880
}
7981

82+
@Override
83+
public void explain(
84+
RelNode plan,
85+
ExplainMode mode,
86+
CalcitePlanContext context,
87+
ResponseListener<ExplainResponse> listener) {
88+
try {
89+
String logical = RelOptUtil.toString(plan, mode.toExplainLevel());
90+
listener.onResponse(new ExplainResponse(new ExplainResponseNodeV2(logical, null, null)));
91+
} catch (Exception e) {
92+
listener.onFailure(e);
93+
}
94+
}
95+
8096
private List<ExprValue> convertRows(Iterable<Object[]> rows, List<RelDataTypeField> fields) {
8197
List<ExprValue> results = new ArrayList<>();
8298
for (Object[] row : rows) {
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.ppl;
7+
8+
import static org.opensearch.sql.util.MatcherUtils.assertYamlEqualsIgnoreId;
9+
10+
import java.io.IOException;
11+
import org.junit.Test;
12+
13+
/**
14+
* Explain integration tests for queries routed through the analytics engine path (Project Mustang).
15+
* Validates that PPL queries targeting "parquet_*" indices produce correct logical plans via the
16+
* _plugins/_ppl/_explain endpoint.
17+
*
18+
* <p>Expected output files are in resources/expectedOutput/analytics/. Each test compares the
19+
* explain YAML output against its expected file, following the same pattern as CalciteExplainIT.
20+
*
21+
* <p>Since the analytics engine is not yet available, physical and extended plans are null. Only
22+
* the logical plan (Calcite RelNode tree) is verified.
23+
*/
24+
public class AnalyticsExplainIT extends PPLIntegTestCase {
25+
26+
@Override
27+
protected void init() throws Exception {
28+
// No index loading needed -- stub schema and data are hardcoded
29+
}
30+
31+
private String loadAnalyticsExpectedPlan(String fileName) {
32+
return loadFromFile("expectedOutput/analytics/" + fileName);
33+
}
34+
35+
@Test
36+
public void testExplainSimpleScan() throws IOException {
37+
assertYamlEqualsIgnoreId(
38+
loadAnalyticsExpectedPlan("explain_simple_scan.yaml"),
39+
explainQueryYaml("source = opensearch.parquet_logs"));
40+
}
41+
42+
@Test
43+
public void testExplainProject() throws IOException {
44+
assertYamlEqualsIgnoreId(
45+
loadAnalyticsExpectedPlan("explain_project.yaml"),
46+
explainQueryYaml("source = opensearch.parquet_logs | fields ts, message"));
47+
}
48+
49+
@Test
50+
public void testExplainFilterAndProject() throws IOException {
51+
assertYamlEqualsIgnoreId(
52+
loadAnalyticsExpectedPlan("explain_filter_project.yaml"),
53+
explainQueryYaml(
54+
"source = opensearch.parquet_logs | where status = 200 | fields ts, message"));
55+
}
56+
57+
@Test
58+
public void testExplainAggregation() throws IOException {
59+
assertYamlEqualsIgnoreId(
60+
loadAnalyticsExpectedPlan("explain_aggregation.yaml"),
61+
explainQueryYaml("source = opensearch.parquet_logs | stats count() by status"));
62+
}
63+
64+
@Test
65+
public void testExplainSort() throws IOException {
66+
assertYamlEqualsIgnoreId(
67+
loadAnalyticsExpectedPlan("explain_sort.yaml"),
68+
explainQueryYaml("source = opensearch.parquet_logs | sort ts"));
69+
}
70+
71+
@Test
72+
public void testExplainEval() throws IOException {
73+
assertYamlEqualsIgnoreId(
74+
loadAnalyticsExpectedPlan("explain_eval.yaml"),
75+
explainQueryYaml("source = opensearch.parquet_logs | eval error = status = 500"));
76+
}
77+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
calcite:
2+
logical: |
3+
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
4+
LogicalProject(count()=[$1], status=[$0])
5+
LogicalAggregate(group=[{0}], count()=[COUNT()])
6+
LogicalProject(status=[$1])
7+
LogicalTableScan(table=[[opensearch, parquet_logs]])
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
calcite:
2+
logical: |
3+
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
4+
LogicalProject(ts=[$0], status=[$1], message=[$2], ip_addr=[$3], error=[=($1, 500)])
5+
LogicalTableScan(table=[[opensearch, parquet_logs]])
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
calcite:
2+
logical: |
3+
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
4+
LogicalProject(ts=[$0], message=[$2])
5+
LogicalFilter(condition=[=($1, 200)])
6+
LogicalTableScan(table=[[opensearch, parquet_logs]])
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
calcite:
2+
logical: |
3+
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
4+
LogicalProject(ts=[$0], message=[$2])
5+
LogicalTableScan(table=[[opensearch, parquet_logs]])
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
calcite:
2+
logical: |
3+
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
4+
LogicalTableScan(table=[[opensearch, parquet_logs]])
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
calcite:
2+
logical: |
3+
LogicalSystemLimit(sort0=[$0], dir0=[ASC-nulls-first], fetch=[10000], type=[QUERY_SIZE_LIMIT])
4+
LogicalSort(sort0=[$0], dir0=[ASC-nulls-first])
5+
LogicalTableScan(table=[[opensearch, parquet_logs]])

plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.sql.plugin.rest;
77

8+
import static org.opensearch.sql.executor.ExecutionEngine.ExplainResponse;
89
import static org.opensearch.sql.lang.PPLLangSpec.PPL_SPEC;
910
import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;
1011
import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY;
@@ -72,6 +73,7 @@ public static boolean isAnalyticsIndex(String query) {
7273
* @param pplRequest the original PPL request
7374
* @param listener the transport action listener
7475
*/
76+
/** Execute a query through the unified query pipeline on the sql-worker thread pool. */
7577
public void execute(
7678
String query,
7779
QueryType queryType,
@@ -85,6 +87,24 @@ public void execute(
8587
SQL_WORKER_THREAD_POOL_NAME);
8688
}
8789

90+
/**
91+
* Explain a query through the unified query pipeline on the sql-worker thread pool. Returns
92+
* ExplainResponse via ResponseListener so the caller (TransportPPLQueryAction) can format it
93+
* using its own createExplainResponseListener, reusing the existing format-aware logic.
94+
*/
95+
public void explain(
96+
String query,
97+
QueryType queryType,
98+
PPLQueryRequest pplRequest,
99+
ResponseListener<ExplainResponse> listener) {
100+
client
101+
.threadPool()
102+
.schedule(
103+
withCurrentContext(() -> doExplain(query, queryType, pplRequest, listener)),
104+
new TimeValue(0),
105+
SQL_WORKER_THREAD_POOL_NAME);
106+
}
107+
88108
private void doExecute(
89109
String query,
90110
QueryType queryType,
@@ -104,8 +124,6 @@ private void doExecute(
104124
UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context);
105125
RelNode plan = planner.plan(query);
106126

107-
// Add query size limit to the plan so the analytics engine can enforce it
108-
// during execution, consistent with PPL V3 (see QueryService.convertToCalcitePlan)
109127
CalcitePlanContext planContext = context.getPlanContext();
110128
plan = addQuerySizeLimit(plan, planContext);
111129

@@ -123,6 +141,41 @@ private void doExecute(
123141
}
124142
}
125143

144+
private void doExplain(
145+
String query,
146+
QueryType queryType,
147+
PPLQueryRequest pplRequest,
148+
ResponseListener<ExplainResponse> listener) {
149+
try {
150+
long startTime = System.nanoTime();
151+
AbstractSchema schema = StubSchemaProvider.buildSchema();
152+
153+
try (UnifiedQueryContext context =
154+
UnifiedQueryContext.builder()
155+
.language(queryType)
156+
.catalog(SCHEMA_NAME, schema)
157+
.defaultNamespace(SCHEMA_NAME)
158+
.build()) {
159+
160+
UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context);
161+
RelNode plan = planner.plan(query);
162+
163+
CalcitePlanContext planContext = context.getPlanContext();
164+
plan = addQuerySizeLimit(plan, planContext);
165+
166+
long planTime = System.nanoTime();
167+
LOG.info(
168+
"[unified] Planning completed in {}ms for {} query",
169+
(planTime - startTime) / 1_000_000,
170+
queryType);
171+
172+
analyticsEngine.explain(plan, pplRequest.mode(), planContext, listener);
173+
}
174+
} catch (Exception e) {
175+
listener.onFailure(e);
176+
}
177+
}
178+
126179
/**
127180
* Add a system-level query size limit to the plan. This ensures the analytics engine enforces the
128181
* limit during execution rather than returning all rows for post-processing truncation.

0 commit comments

Comments
 (0)