Skip to content

Commit 38710ef

Browse files
committed
Integrate SQL REST endpoint with analytics engine path
Add SQL query routing through the analytics engine for Parquet-backed indices. SQL queries targeting "parquet_*" indices are routed to RestUnifiedQueryAction via the unified query pipeline (Calcite SQL parser -> UnifiedQueryPlanner -> AnalyticsExecutionEngine). Changes: - Add SqlNode table extraction in RestUnifiedQueryAction.extractIndexName() to support SQL query routing (handles SqlSelect -> SqlIdentifier) - Add executeSql() and explainSql() methods in RestUnifiedQueryAction for SQL queries (parallel to existing PPL execute/explain) - Add analytics routing in RestSqlAction via optional BiFunction router that checks isAnalyticsIndex before delegating to SQLService - Wire the router in SQLPlugin.createSqlAnalyticsRouter() - Non-analytics SQL queries fall through to the existing V2 engine - Add AnalyticsSQLIT integration tests: SELECT *, column projection, explain, non-parquet fallback, syntax error handling Resolves: #5248 Signed-off-by: Kai Huang <ahkcs@amazon.com>
1 parent 24dd79c commit 38710ef

4 files changed

Lines changed: 328 additions & 3 deletions

File tree

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.sql;
7+
8+
import static org.opensearch.sql.legacy.TestUtils.getResponseBody;
9+
import static org.opensearch.sql.legacy.TestUtils.isIndexExist;
10+
import static org.opensearch.sql.util.MatcherUtils.schema;
11+
import static org.opensearch.sql.util.MatcherUtils.verifyNumOfRows;
12+
import static org.opensearch.sql.util.MatcherUtils.verifySchema;
13+
14+
import java.io.IOException;
15+
import org.json.JSONObject;
16+
import org.junit.Test;
17+
import org.opensearch.client.Request;
18+
import org.opensearch.client.RequestOptions;
19+
import org.opensearch.client.Response;
20+
import org.opensearch.client.ResponseException;
21+
import org.opensearch.sql.legacy.SQLIntegTestCase;
22+
import org.opensearch.sql.legacy.TestsConstants;
23+
24+
/**
25+
* Integration tests for SQL queries routed through the analytics engine path (Project Mustang).
26+
* Queries targeting "parquet_*" indices are routed to {@code RestUnifiedQueryAction} which uses
27+
* {@code AnalyticsExecutionEngine} with a stub {@code QueryPlanExecutor}.
28+
*
29+
* <p>These tests validate the full pipeline: REST SQL request -> analytics routing -> planning via
30+
* UnifiedQueryPlanner (Calcite SQL parser) -> execution via AnalyticsExecutionEngine -> response
31+
* formatting.
32+
*/
33+
public class AnalyticsSQLIT extends SQLIntegTestCase {
34+
35+
@Override
36+
protected void init() throws Exception {
37+
createParquetLogsIndex();
38+
}
39+
40+
private void createParquetLogsIndex() throws IOException {
41+
if (isIndexExist(client(), "parquet_logs")) {
42+
return;
43+
}
44+
Request request = new Request("PUT", "/parquet_logs");
45+
request.setJsonEntity(
46+
"{"
47+
+ "\"mappings\": {"
48+
+ " \"properties\": {"
49+
+ " \"ts\": {\"type\": \"date\"},"
50+
+ " \"status\": {\"type\": \"integer\"},"
51+
+ " \"message\": {\"type\": \"keyword\"},"
52+
+ " \"ip_addr\": {\"type\": \"keyword\"}"
53+
+ " }"
54+
+ "}"
55+
+ "}");
56+
client().performRequest(request);
57+
}
58+
59+
private JSONObject executeSqlQuery(String sql) throws IOException {
60+
Request request = new Request("POST", "/_plugins/_sql");
61+
request.setJsonEntity("{\"query\": \"" + sql + "\"}");
62+
request.setOptions(RequestOptions.DEFAULT);
63+
Response response = client().performRequest(request);
64+
return new JSONObject(getResponseBody(response));
65+
}
66+
67+
private JSONObject explainSqlQuery(String sql) throws IOException {
68+
Request request = new Request("POST", "/_plugins/_sql/_explain");
69+
request.setJsonEntity("{\"query\": \"" + sql + "\"}");
70+
request.setOptions(RequestOptions.DEFAULT);
71+
Response response = client().performRequest(request);
72+
return new JSONObject(getResponseBody(response));
73+
}
74+
75+
@Test
76+
public void testSelectStarFromParquetIndex() throws IOException {
77+
JSONObject result = executeSqlQuery("SELECT * FROM parquet_logs");
78+
verifySchema(
79+
result,
80+
schema("ip_addr", "string"),
81+
schema("message", "string"),
82+
schema("status", "integer"),
83+
schema("ts", "timestamp"));
84+
verifyNumOfRows(result, 3);
85+
}
86+
87+
@Test
88+
public void testSelectSpecificColumns() throws IOException {
89+
JSONObject result = executeSqlQuery("SELECT status, message FROM parquet_logs");
90+
verifySchema(result, schema("status", "integer"), schema("message", "string"));
91+
verifyNumOfRows(result, 3);
92+
}
93+
94+
@Test
95+
public void testExplainSelectStar() throws IOException {
96+
JSONObject result = explainSqlQuery("SELECT * FROM parquet_logs");
97+
assertTrue(result.has("calcite"));
98+
JSONObject calcite = result.getJSONObject("calcite");
99+
assertTrue(calcite.has("logical"));
100+
String logical = calcite.getString("logical");
101+
assertTrue(logical.contains("LogicalTableScan"));
102+
assertTrue(logical.contains("parquet_logs"));
103+
}
104+
105+
@Test
106+
public void testNonParquetIndexFallsThroughToV2() throws IOException {
107+
// Verify that a regular (non-parquet) SQL query still works via the normal V2 engine
108+
loadIndex(Index.ACCOUNT);
109+
JSONObject result =
110+
executeQuery("SELECT firstname FROM " + TestsConstants.TEST_INDEX_ACCOUNT + " LIMIT 1");
111+
verifyNumOfRows(result, 1);
112+
}
113+
114+
@Test
115+
public void testSyntaxError() {
116+
try {
117+
executeSqlQuery("SELEC * FROM parquet_logs");
118+
fail("Expected syntax error");
119+
} catch (ResponseException e) {
120+
assertTrue(e.getMessage().contains("400"));
121+
} catch (IOException e) {
122+
fail("Unexpected IOException: " + e.getMessage());
123+
}
124+
}
125+
}

legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,26 @@ public class RestSqlAction extends BaseRestHandler {
8383
/** New SQL query request handler. */
8484
private final RestSQLQueryAction newSqlQueryHandler;
8585

86+
/**
87+
* Optional analytics router. If set, it's called before the normal SQL engine. Accepts the
88+
* request and channel, returns {@code true} if it handled the request, {@code false} to fall
89+
* through to normal SQL engine.
90+
*/
91+
private final java.util.function.BiFunction<SQLQueryRequest, RestChannel, Boolean>
92+
analyticsRouter;
93+
8694
public RestSqlAction(Settings settings, Injector injector) {
95+
this(settings, injector, null);
96+
}
97+
98+
public RestSqlAction(
99+
Settings settings,
100+
Injector injector,
101+
java.util.function.BiFunction<SQLQueryRequest, RestChannel, Boolean> analyticsRouter) {
87102
super();
88103
this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings);
89104
this.newSqlQueryHandler = new RestSQLQueryAction(injector);
105+
this.analyticsRouter = analyticsRouter;
90106
}
91107

92108
@Override
@@ -134,14 +150,44 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
134150

135151
Format format = SqlRequestParam.getFormat(request.params());
136152

137-
// Route request to new query engine if it's supported already
138153
SQLQueryRequest newSqlRequest =
139154
new SQLQueryRequest(
140155
sqlRequest.getJsonContent(),
141156
sqlRequest.getSql(),
142157
request.path(),
143158
request.params(),
144159
sqlRequest.cursor());
160+
161+
// Route to analytics engine for non-Lucene (e.g., Parquet-backed) indices.
162+
// The router returns true and sends the response directly if it handled the request.
163+
if (analyticsRouter != null) {
164+
final SQLQueryRequest finalRequest = newSqlRequest;
165+
return channel -> {
166+
if (!analyticsRouter.apply(finalRequest, channel)) {
167+
// Not an analytics query — delegate to normal SQL engine
168+
try {
169+
newSqlQueryHandler
170+
.prepareRequest(
171+
finalRequest,
172+
(ch, ex) -> {
173+
try {
174+
Format fmt = SqlRequestParam.getFormat(request.params());
175+
QueryAction qa = explainRequest(client, sqlRequest, fmt);
176+
executeSqlRequest(request, qa, client, ch);
177+
} catch (Exception e) {
178+
handleException(ch, e);
179+
}
180+
},
181+
this::handleException)
182+
.accept(channel);
183+
} catch (Exception e) {
184+
handleException(channel, e);
185+
}
186+
}
187+
};
188+
}
189+
190+
// Route request to new query engine if it's supported already
145191
return newSqlQueryHandler.prepareRequest(
146192
newSqlRequest,
147193
(restChannel, exception) -> {

plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.opensearch.script.ScriptContext;
5656
import org.opensearch.script.ScriptEngine;
5757
import org.opensearch.script.ScriptService;
58+
import org.opensearch.sql.common.response.ResponseListener;
5859
import org.opensearch.sql.datasource.DataSourceService;
5960
import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelper;
6061
import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl;
@@ -85,6 +86,7 @@
8586
import org.opensearch.sql.directquery.transport.model.ExecuteDirectQueryActionResponse;
8687
import org.opensearch.sql.directquery.transport.model.ReadDirectQueryResourcesActionResponse;
8788
import org.opensearch.sql.directquery.transport.model.WriteDirectQueryResourcesActionResponse;
89+
import org.opensearch.sql.executor.QueryType;
8890
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
8991
import org.opensearch.sql.legacy.metrics.Metrics;
9092
import org.opensearch.sql.legacy.plugin.RestSqlAction;
@@ -98,6 +100,8 @@
98100
import org.opensearch.sql.plugin.rest.RestPPLQueryAction;
99101
import org.opensearch.sql.plugin.rest.RestPPLStatsAction;
100102
import org.opensearch.sql.plugin.rest.RestQuerySettingsAction;
103+
import org.opensearch.sql.plugin.rest.RestUnifiedQueryAction;
104+
import org.opensearch.sql.plugin.rest.analytics.stub.StubQueryPlanExecutor;
101105
import org.opensearch.sql.plugin.transport.PPLQueryAction;
102106
import org.opensearch.sql.plugin.transport.TransportPPLQueryAction;
103107
import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse;
@@ -165,7 +169,7 @@ public List<RestHandler> getRestHandlers(
165169
return Arrays.asList(
166170
new RestPPLQueryAction(),
167171
new RestPPLGrammarAction(),
168-
new RestSqlAction(settings, injector),
172+
new RestSqlAction(settings, injector, createSqlAnalyticsRouter()),
169173
new RestSqlStatsAction(settings, restController),
170174
new RestPPLStatsAction(settings, restController),
171175
new RestQuerySettingsAction(settings, restController),
@@ -175,6 +179,83 @@ public List<RestHandler> getRestHandlers(
175179
new RestDirectQueryResourcesManagementAction((OpenSearchSettings) pluginSettings));
176180
}
177181

182+
/**
183+
* Create a routing function for SQL queries targeting analytics engine indices. Returns a {@link
184+
* org.opensearch.rest.BaseRestHandler.RestChannelConsumer} if the query targets an analytics
185+
* index, or {@code null} to fall through to the normal SQL engine.
186+
*/
187+
/**
188+
* Creates a routing function for SQL queries targeting analytics engine indices. Returns {@code
189+
* true} if the query was handled (analytics index), {@code false} to fall through to normal SQL.
190+
*/
191+
private java.util.function.BiFunction<
192+
org.opensearch.sql.sql.domain.SQLQueryRequest, org.opensearch.rest.RestChannel, Boolean>
193+
createSqlAnalyticsRouter() {
194+
RestUnifiedQueryAction unifiedQueryHandler =
195+
new RestUnifiedQueryAction(client, clusterService, new StubQueryPlanExecutor());
196+
return (sqlRequest, channel) -> {
197+
if (!unifiedQueryHandler.isAnalyticsIndex(sqlRequest.getQuery(), QueryType.SQL)) {
198+
return false; // not analytics — fall through
199+
}
200+
if (sqlRequest.isExplainRequest()) {
201+
unifiedQueryHandler.explainSql(
202+
sqlRequest.getQuery(),
203+
QueryType.SQL,
204+
new ResponseListener<>() {
205+
@Override
206+
public void onResponse(
207+
org.opensearch.sql.executor.ExecutionEngine.ExplainResponse response) {
208+
var formatter =
209+
new org.opensearch.sql.protocol.response.format.JsonResponseFormatter<
210+
org.opensearch.sql.executor.ExecutionEngine.ExplainResponse>(
211+
org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style
212+
.PRETTY) {
213+
@Override
214+
protected Object buildJsonObject(
215+
org.opensearch.sql.executor.ExecutionEngine.ExplainResponse resp) {
216+
return resp;
217+
}
218+
};
219+
channel.sendResponse(
220+
new org.opensearch.rest.BytesRestResponse(
221+
org.opensearch.core.rest.RestStatus.OK,
222+
"application/json; charset=UTF-8",
223+
formatter.format(response)));
224+
}
225+
226+
@Override
227+
public void onFailure(Exception e) {
228+
channel.sendResponse(
229+
new org.opensearch.rest.BytesRestResponse(
230+
org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
231+
}
232+
});
233+
} else {
234+
unifiedQueryHandler.executeSql(
235+
sqlRequest.getQuery(),
236+
QueryType.SQL,
237+
new org.opensearch.core.action.ActionListener<TransportPPLQueryResponse>() {
238+
@Override
239+
public void onResponse(TransportPPLQueryResponse response) {
240+
channel.sendResponse(
241+
new org.opensearch.rest.BytesRestResponse(
242+
org.opensearch.core.rest.RestStatus.OK,
243+
"application/json; charset=UTF-8",
244+
response.getResult()));
245+
}
246+
247+
@Override
248+
public void onFailure(Exception e) {
249+
channel.sendResponse(
250+
new org.opensearch.rest.BytesRestResponse(
251+
org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
252+
}
253+
});
254+
}
255+
return true; // handled by analytics engine
256+
};
257+
}
258+
178259
/** Register action and handler so that transportClient can find proxy for action. */
179260
@Override
180261
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {

0 commit comments

Comments
 (0)