Skip to content

Commit a91d349

Browse files
committed
Integrate SQL REST endpoint with analytics engine path
Add SQL query routing through the analytics engine for Parquet-backed indices. SQL queries targeting "parquet_*" indices are routed to RestUnifiedQueryAction via the unified query pipeline (Calcite SQL parser -> UnifiedQueryPlanner -> AnalyticsExecutionEngine). Changes: - Add SqlNode table extraction in RestUnifiedQueryAction.extractIndexName() to support SQL query routing (handles SqlSelect -> SqlIdentifier) - Add executeSql() and explainSql() methods in RestUnifiedQueryAction for SQL queries (parallel to existing PPL execute/explain) - Add analytics routing in RestSqlAction via optional BiFunction router that checks isAnalyticsIndex before delegating to SQLService - Wire the router in SQLPlugin.createSqlAnalyticsRouter() - Non-analytics SQL queries fall through to the existing V2 engine - Add AnalyticsSQLIT integration tests: SELECT *, column projection, explain, non-parquet fallback, syntax error handling Resolves: #5248 Signed-off-by: Kai Huang <ahkcs@amazon.com>
1 parent 24dd79c commit a91d349

5 files changed

Lines changed: 382 additions & 3 deletions

File tree

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.sql;
7+
8+
import static org.hamcrest.Matchers.containsString;
9+
import static org.opensearch.sql.legacy.TestUtils.getResponseBody;
10+
import static org.opensearch.sql.legacy.TestUtils.isIndexExist;
11+
12+
import java.io.IOException;
13+
import org.json.JSONObject;
14+
import org.junit.Test;
15+
import org.opensearch.client.Request;
16+
import org.opensearch.client.RequestOptions;
17+
import org.opensearch.client.Response;
18+
import org.opensearch.sql.legacy.SQLIntegTestCase;
19+
20+
/**
21+
* Explain integration tests for SQL queries routed through the analytics engine path (Project
22+
* Mustang). Validates that SQL queries targeting "parquet_*" indices produce correct logical plans
23+
* via the _plugins/_sql/_explain endpoint.
24+
*/
25+
public class AnalyticsSQLExplainIT extends SQLIntegTestCase {
26+
27+
@Override
28+
protected void init() throws Exception {
29+
if (!isIndexExist(client(), "parquet_logs")) {
30+
Request request = new Request("PUT", "/parquet_logs");
31+
request.setJsonEntity(
32+
"{"
33+
+ "\"mappings\": {"
34+
+ " \"properties\": {"
35+
+ " \"ts\": {\"type\": \"date\"},"
36+
+ " \"status\": {\"type\": \"integer\"},"
37+
+ " \"message\": {\"type\": \"keyword\"},"
38+
+ " \"ip_addr\": {\"type\": \"keyword\"}"
39+
+ " }"
40+
+ "}"
41+
+ "}");
42+
client().performRequest(request);
43+
}
44+
}
45+
46+
private JSONObject explainSqlQuery(String sql) throws IOException {
47+
Request request = new Request("POST", "/_plugins/_sql/_explain");
48+
request.setJsonEntity("{\"query\": \"" + sql + "\"}");
49+
request.setOptions(RequestOptions.DEFAULT);
50+
Response response = client().performRequest(request);
51+
return new JSONObject(getResponseBody(response));
52+
}
53+
54+
private String getLogicalPlan(JSONObject explainResult) {
55+
return explainResult.getJSONObject("calcite").getString("logical");
56+
}
57+
58+
@Test
59+
public void testExplainSelectStar() throws IOException {
60+
JSONObject result = explainSqlQuery("SELECT * FROM parquet_logs");
61+
String logical = getLogicalPlan(result);
62+
assertThat(logical, containsString("LogicalSystemLimit"));
63+
assertThat(logical, containsString("LogicalTableScan"));
64+
assertThat(logical, containsString("parquet_logs"));
65+
}
66+
67+
@Test
68+
public void testExplainSelectColumns() throws IOException {
69+
JSONObject result = explainSqlQuery("SELECT ts, status FROM parquet_logs");
70+
String logical = getLogicalPlan(result);
71+
assertThat(logical, containsString("LogicalProject"));
72+
assertThat(logical, containsString("LogicalTableScan"));
73+
assertThat(logical, containsString("parquet_logs"));
74+
}
75+
76+
@Test
77+
public void testExplainSelectWithWhere() throws IOException {
78+
JSONObject result = explainSqlQuery("SELECT ts, message FROM parquet_logs WHERE status = 200");
79+
String logical = getLogicalPlan(result);
80+
assertThat(logical, containsString("LogicalFilter"));
81+
assertThat(logical, containsString("LogicalProject"));
82+
assertThat(logical, containsString("LogicalTableScan"));
83+
assertThat(logical, containsString("parquet_logs"));
84+
}
85+
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.sql;
7+
8+
import static org.opensearch.sql.legacy.TestUtils.getResponseBody;
9+
import static org.opensearch.sql.legacy.TestUtils.isIndexExist;
10+
import static org.opensearch.sql.util.MatcherUtils.rows;
11+
import static org.opensearch.sql.util.MatcherUtils.schema;
12+
import static org.opensearch.sql.util.MatcherUtils.verifyDataRows;
13+
import static org.opensearch.sql.util.MatcherUtils.verifySchema;
14+
15+
import java.io.IOException;
16+
import org.json.JSONObject;
17+
import org.junit.Test;
18+
import org.opensearch.client.Request;
19+
import org.opensearch.client.RequestOptions;
20+
import org.opensearch.client.Response;
21+
import org.opensearch.client.ResponseException;
22+
import org.opensearch.sql.legacy.SQLIntegTestCase;
23+
24+
/**
25+
* Integration tests for SQL queries routed through the analytics engine path (Project Mustang).
26+
* Queries targeting "parquet_*" indices are routed to {@code RestUnifiedQueryAction} which uses
27+
* {@code AnalyticsExecutionEngine} with a stub {@code QueryPlanExecutor}.
28+
*
29+
* <p>The stub executor returns rows in a fixed order [ts, status, message, ip_addr] regardless of
30+
* the plan. The schema from OpenSearchSchemaBuilder is alphabetical [ip_addr, message, status, ts].
31+
* AnalyticsExecutionEngine maps values by position, so the data values appear mismatched. This is
32+
* expected; the real analytics engine will evaluate the plan correctly.
33+
*/
34+
public class AnalyticsSQLIT extends SQLIntegTestCase {
35+
36+
@Override
37+
protected void init() throws Exception {
38+
createParquetLogsIndex();
39+
}
40+
41+
private void createParquetLogsIndex() throws IOException {
42+
if (isIndexExist(client(), "parquet_logs")) {
43+
return;
44+
}
45+
Request request = new Request("PUT", "/parquet_logs");
46+
request.setJsonEntity(
47+
"{"
48+
+ "\"mappings\": {"
49+
+ " \"properties\": {"
50+
+ " \"ts\": {\"type\": \"date\"},"
51+
+ " \"status\": {\"type\": \"integer\"},"
52+
+ " \"message\": {\"type\": \"keyword\"},"
53+
+ " \"ip_addr\": {\"type\": \"keyword\"}"
54+
+ " }"
55+
+ "}"
56+
+ "}");
57+
client().performRequest(request);
58+
}
59+
60+
private JSONObject executeSqlQuery(String sql) throws IOException {
61+
Request request = new Request("POST", "/_plugins/_sql");
62+
request.setJsonEntity("{\"query\": \"" + sql + "\"}");
63+
request.setOptions(RequestOptions.DEFAULT);
64+
Response response = client().performRequest(request);
65+
return new JSONObject(getResponseBody(response));
66+
}
67+
68+
@Test
69+
public void testSelectStarSchemaAndData() throws IOException {
70+
JSONObject result = executeSqlQuery("SELECT * FROM parquet_logs");
71+
verifySchema(
72+
result,
73+
schema("ip_addr", "string"),
74+
schema("message", "string"),
75+
schema("status", "integer"),
76+
schema("ts", "timestamp"));
77+
// Stub returns [ts, status, message, ip_addr] per row, mapped by position to
78+
// [ip_addr, message, status, ts] schema. Values appear mismatched — expected with stub.
79+
verifyDataRows(
80+
result,
81+
rows("2024-01-15 10:30:00", 200, "Request completed", "192.168.1.1"),
82+
rows("2024-01-15 10:31:00", 200, "Health check OK", "192.168.1.2"),
83+
rows("2024-01-15 10:32:00", 500, "Internal server error", "192.168.1.3"));
84+
}
85+
86+
@Test
87+
public void testSelectSpecificColumns() throws IOException {
88+
JSONObject result = executeSqlQuery("SELECT status, message FROM parquet_logs");
89+
verifySchema(result, schema("status", "integer"), schema("message", "string"));
90+
verifyDataRows(
91+
result,
92+
rows("2024-01-15 10:30:00", 200),
93+
rows("2024-01-15 10:31:00", 200),
94+
rows("2024-01-15 10:32:00", 500));
95+
}
96+
97+
@Test(expected = ResponseException.class)
98+
public void testSyntaxError() throws IOException {
99+
executeSqlQuery("SELEC * FROM parquet_logs");
100+
}
101+
}

legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.Map;
2020
import java.util.Optional;
2121
import java.util.Set;
22+
import java.util.function.BiFunction;
2223
import java.util.function.Predicate;
2324
import java.util.regex.Pattern;
2425
import org.apache.logging.log4j.LogManager;
@@ -83,10 +84,25 @@ public class RestSqlAction extends BaseRestHandler {
8384
/** New SQL query request handler. */
8485
private final RestSQLQueryAction newSqlQueryHandler;
8586

87+
/**
88+
* Optional analytics router. If set, it's called before the normal SQL engine. Accepts the
89+
* request and channel, returns {@code true} if it handled the request, {@code false} to fall
90+
* through to normal SQL engine.
91+
*/
92+
private final BiFunction<SQLQueryRequest, RestChannel, Boolean> analyticsRouter;
93+
8694
public RestSqlAction(Settings settings, Injector injector) {
95+
this(settings, injector, null);
96+
}
97+
98+
public RestSqlAction(
99+
Settings settings,
100+
Injector injector,
101+
BiFunction<SQLQueryRequest, RestChannel, Boolean> analyticsRouter) {
87102
super();
88103
this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings);
89104
this.newSqlQueryHandler = new RestSQLQueryAction(injector);
105+
this.analyticsRouter = analyticsRouter;
90106
}
91107

92108
@Override
@@ -134,14 +150,44 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
134150

135151
Format format = SqlRequestParam.getFormat(request.params());
136152

137-
// Route request to new query engine if it's supported already
138153
SQLQueryRequest newSqlRequest =
139154
new SQLQueryRequest(
140155
sqlRequest.getJsonContent(),
141156
sqlRequest.getSql(),
142157
request.path(),
143158
request.params(),
144159
sqlRequest.cursor());
160+
161+
// Route to analytics engine for non-Lucene (e.g., Parquet-backed) indices.
162+
// The router returns true and sends the response directly if it handled the request.
163+
if (analyticsRouter != null) {
164+
final SQLQueryRequest finalRequest = newSqlRequest;
165+
return channel -> {
166+
if (!analyticsRouter.apply(finalRequest, channel)) {
167+
// Not an analytics query — delegate to normal SQL engine
168+
try {
169+
newSqlQueryHandler
170+
.prepareRequest(
171+
finalRequest,
172+
(ch, ex) -> {
173+
try {
174+
Format fmt = SqlRequestParam.getFormat(request.params());
175+
QueryAction qa = explainRequest(client, sqlRequest, fmt);
176+
executeSqlRequest(request, qa, client, ch);
177+
} catch (Exception e) {
178+
handleException(ch, e);
179+
}
180+
},
181+
this::handleException)
182+
.accept(channel);
183+
} catch (Exception e) {
184+
handleException(channel, e);
185+
}
186+
}
187+
};
188+
}
189+
190+
// Route request to new query engine if it's supported already
145191
return newSqlQueryHandler.prepareRequest(
146192
newSqlRequest,
147193
(restChannel, exception) -> {

0 commit comments

Comments
 (0)