Skip to content

Commit 7b63b49

Browse files
committed
Integrate SQL REST endpoint with analytics engine path
Add SQL query routing through the analytics engine for Parquet-backed indices. SQL queries targeting "parquet_*" indices are routed to RestUnifiedQueryAction via the unified query pipeline (Calcite SQL parser -> UnifiedQueryPlanner -> AnalyticsExecutionEngine). Changes: - Add SqlNode table extraction in RestUnifiedQueryAction.extractIndexName() to support SQL query routing (handles SqlSelect -> SqlIdentifier) - Add executeSql() and explainSql() methods in RestUnifiedQueryAction for SQL queries (parallel to existing PPL execute/explain) - Add analytics routing in RestSqlAction via optional BiFunction router that checks isAnalyticsIndex before delegating to SQLService - Wire the router in SQLPlugin.createSqlAnalyticsRouter() - Non-analytics SQL queries fall through to the existing V2 engine - Add AnalyticsSQLIT integration tests: SELECT *, column projection, explain, non-parquet fallback, syntax error handling Resolves: #5248 Signed-off-by: Kai Huang <ahkcs@amazon.com>
1 parent 24dd79c commit 7b63b49

4 files changed

Lines changed: 321 additions & 3 deletions

File tree

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.sql;
7+
8+
import static org.opensearch.sql.legacy.TestUtils.getResponseBody;
9+
import static org.opensearch.sql.legacy.TestUtils.isIndexExist;
10+
import static org.opensearch.sql.util.MatcherUtils.schema;
11+
import static org.opensearch.sql.util.MatcherUtils.verifyNumOfRows;
12+
import static org.opensearch.sql.util.MatcherUtils.verifySchema;
13+
14+
import java.io.IOException;
15+
import org.json.JSONObject;
16+
import org.junit.Test;
17+
import org.opensearch.client.Request;
18+
import org.opensearch.client.RequestOptions;
19+
import org.opensearch.client.Response;
20+
import org.opensearch.client.ResponseException;
21+
import org.opensearch.sql.legacy.SQLIntegTestCase;
22+
import org.opensearch.sql.legacy.TestsConstants;
23+
24+
/**
25+
* Integration tests for SQL queries routed through the analytics engine path (Project Mustang).
26+
* Queries targeting "parquet_*" indices are routed to {@code RestUnifiedQueryAction} which uses
27+
* {@code AnalyticsExecutionEngine} with a stub {@code QueryPlanExecutor}.
28+
*
29+
* <p>These tests validate the full pipeline: REST SQL request -> analytics routing -> planning via
30+
* UnifiedQueryPlanner (Calcite SQL parser) -> execution via AnalyticsExecutionEngine -> response
31+
* formatting.
32+
*/
33+
public class AnalyticsSQLIT extends SQLIntegTestCase {
34+
35+
@Override
36+
protected void init() throws Exception {
37+
createParquetLogsIndex();
38+
}
39+
40+
private void createParquetLogsIndex() throws IOException {
41+
if (isIndexExist(client(), "parquet_logs")) {
42+
return;
43+
}
44+
Request request = new Request("PUT", "/parquet_logs");
45+
request.setJsonEntity(
46+
"{"
47+
+ "\"mappings\": {"
48+
+ " \"properties\": {"
49+
+ " \"ts\": {\"type\": \"date\"},"
50+
+ " \"status\": {\"type\": \"integer\"},"
51+
+ " \"message\": {\"type\": \"keyword\"},"
52+
+ " \"ip_addr\": {\"type\": \"keyword\"}"
53+
+ " }"
54+
+ "}"
55+
+ "}");
56+
client().performRequest(request);
57+
}
58+
59+
private JSONObject executeSqlQuery(String sql) throws IOException {
60+
Request request = new Request("POST", "/_plugins/_sql");
61+
request.setJsonEntity("{\"query\": \"" + sql + "\"}");
62+
request.setOptions(RequestOptions.DEFAULT);
63+
Response response = client().performRequest(request);
64+
return new JSONObject(getResponseBody(response));
65+
}
66+
67+
private JSONObject explainSqlQuery(String sql) throws IOException {
68+
Request request = new Request("POST", "/_plugins/_sql/_explain");
69+
request.setJsonEntity("{\"query\": \"" + sql + "\"}");
70+
request.setOptions(RequestOptions.DEFAULT);
71+
Response response = client().performRequest(request);
72+
return new JSONObject(getResponseBody(response));
73+
}
74+
75+
@Test
76+
public void testSelectStarFromParquetIndex() throws IOException {
77+
JSONObject result = executeSqlQuery("SELECT * FROM parquet_logs");
78+
verifySchema(
79+
result,
80+
schema("ip_addr", "string"),
81+
schema("message", "string"),
82+
schema("status", "integer"),
83+
schema("ts", "timestamp"));
84+
verifyNumOfRows(result, 3);
85+
}
86+
87+
@Test
88+
public void testSelectSpecificColumns() throws IOException {
89+
JSONObject result = executeSqlQuery("SELECT status, message FROM parquet_logs");
90+
verifySchema(result, schema("status", "integer"), schema("message", "string"));
91+
verifyNumOfRows(result, 3);
92+
}
93+
94+
@Test
95+
public void testExplainSelectStar() throws IOException {
96+
JSONObject result = explainSqlQuery("SELECT * FROM parquet_logs");
97+
assertTrue(result.has("calcite"));
98+
JSONObject calcite = result.getJSONObject("calcite");
99+
assertTrue(calcite.has("logical"));
100+
String logical = calcite.getString("logical");
101+
assertTrue(logical.contains("LogicalTableScan"));
102+
assertTrue(logical.contains("parquet_logs"));
103+
}
104+
105+
@Test
106+
public void testNonParquetIndexFallsThroughToV2() throws IOException {
107+
// Verify that a regular (non-parquet) SQL query still works via the normal V2 engine
108+
loadIndex(Index.ACCOUNT);
109+
JSONObject result =
110+
executeQuery("SELECT firstname FROM " + TestsConstants.TEST_INDEX_ACCOUNT + " LIMIT 1");
111+
verifyNumOfRows(result, 1);
112+
}
113+
114+
@Test
115+
public void testSyntaxError() {
116+
try {
117+
executeSqlQuery("SELEC * FROM parquet_logs");
118+
fail("Expected syntax error");
119+
} catch (ResponseException e) {
120+
assertTrue(e.getMessage().contains("400"));
121+
} catch (IOException e) {
122+
fail("Unexpected IOException: " + e.getMessage());
123+
}
124+
}
125+
}

legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.Map;
2020
import java.util.Optional;
2121
import java.util.Set;
22+
import java.util.function.BiFunction;
2223
import java.util.function.Predicate;
2324
import java.util.regex.Pattern;
2425
import org.apache.logging.log4j.LogManager;
@@ -83,10 +84,25 @@ public class RestSqlAction extends BaseRestHandler {
8384
/** New SQL query request handler. */
8485
private final RestSQLQueryAction newSqlQueryHandler;
8586

87+
/**
88+
* Optional analytics router. If set, it's called before the normal SQL engine. Accepts the
89+
* request and channel, returns {@code true} if it handled the request, {@code false} to fall
90+
* through to normal SQL engine.
91+
*/
92+
private final BiFunction<SQLQueryRequest, RestChannel, Boolean> analyticsRouter;
93+
8694
public RestSqlAction(Settings settings, Injector injector) {
95+
this(settings, injector, null);
96+
}
97+
98+
public RestSqlAction(
99+
Settings settings,
100+
Injector injector,
101+
BiFunction<SQLQueryRequest, RestChannel, Boolean> analyticsRouter) {
87102
super();
88103
this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings);
89104
this.newSqlQueryHandler = new RestSQLQueryAction(injector);
105+
this.analyticsRouter = analyticsRouter;
90106
}
91107

92108
@Override
@@ -134,14 +150,44 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
134150

135151
Format format = SqlRequestParam.getFormat(request.params());
136152

137-
// Route request to new query engine if it's supported already
138153
SQLQueryRequest newSqlRequest =
139154
new SQLQueryRequest(
140155
sqlRequest.getJsonContent(),
141156
sqlRequest.getSql(),
142157
request.path(),
143158
request.params(),
144159
sqlRequest.cursor());
160+
161+
// Route to analytics engine for non-Lucene (e.g., Parquet-backed) indices.
162+
// The router returns true and sends the response directly if it handled the request.
163+
if (analyticsRouter != null) {
164+
final SQLQueryRequest finalRequest = newSqlRequest;
165+
return channel -> {
166+
if (!analyticsRouter.apply(finalRequest, channel)) {
167+
// Not an analytics query — delegate to normal SQL engine
168+
try {
169+
newSqlQueryHandler
170+
.prepareRequest(
171+
finalRequest,
172+
(ch, ex) -> {
173+
try {
174+
Format fmt = SqlRequestParam.getFormat(request.params());
175+
QueryAction qa = explainRequest(client, sqlRequest, fmt);
176+
executeSqlRequest(request, qa, client, ch);
177+
} catch (Exception e) {
178+
handleException(ch, e);
179+
}
180+
},
181+
this::handleException)
182+
.accept(channel);
183+
} catch (Exception e) {
184+
handleException(channel, e);
185+
}
186+
}
187+
};
188+
}
189+
190+
// Route request to new query engine if it's supported already
145191
return newSqlQueryHandler.prepareRequest(
146192
newSqlRequest,
147193
(restChannel, exception) -> {

plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.util.Collection;
1919
import java.util.List;
2020
import java.util.Objects;
21+
import java.util.function.BiFunction;
2122
import java.util.function.Supplier;
2223
import org.apache.commons.lang3.StringUtils;
2324
import org.apache.logging.log4j.LogManager;
@@ -36,8 +37,10 @@
3637
import org.opensearch.common.settings.Settings;
3738
import org.opensearch.common.settings.SettingsFilter;
3839
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
40+
import org.opensearch.core.action.ActionListener;
3941
import org.opensearch.core.action.ActionResponse;
4042
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
43+
import org.opensearch.core.rest.RestStatus;
4144
import org.opensearch.core.xcontent.NamedXContentRegistry;
4245
import org.opensearch.env.Environment;
4346
import org.opensearch.env.NodeEnvironment;
@@ -50,11 +53,14 @@
5053
import org.opensearch.plugins.ScriptPlugin;
5154
import org.opensearch.plugins.SystemIndexPlugin;
5255
import org.opensearch.repositories.RepositoriesService;
56+
import org.opensearch.rest.BytesRestResponse;
57+
import org.opensearch.rest.RestChannel;
5358
import org.opensearch.rest.RestController;
5459
import org.opensearch.rest.RestHandler;
5560
import org.opensearch.script.ScriptContext;
5661
import org.opensearch.script.ScriptEngine;
5762
import org.opensearch.script.ScriptService;
63+
import org.opensearch.sql.common.response.ResponseListener;
5864
import org.opensearch.sql.datasource.DataSourceService;
5965
import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelper;
6066
import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl;
@@ -85,6 +91,8 @@
8591
import org.opensearch.sql.directquery.transport.model.ExecuteDirectQueryActionResponse;
8692
import org.opensearch.sql.directquery.transport.model.ReadDirectQueryResourcesActionResponse;
8793
import org.opensearch.sql.directquery.transport.model.WriteDirectQueryResourcesActionResponse;
94+
import org.opensearch.sql.executor.ExecutionEngine.ExplainResponse;
95+
import org.opensearch.sql.executor.QueryType;
8896
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
8997
import org.opensearch.sql.legacy.metrics.Metrics;
9098
import org.opensearch.sql.legacy.plugin.RestSqlAction;
@@ -98,10 +106,14 @@
98106
import org.opensearch.sql.plugin.rest.RestPPLQueryAction;
99107
import org.opensearch.sql.plugin.rest.RestPPLStatsAction;
100108
import org.opensearch.sql.plugin.rest.RestQuerySettingsAction;
109+
import org.opensearch.sql.plugin.rest.RestUnifiedQueryAction;
110+
import org.opensearch.sql.plugin.rest.analytics.stub.StubQueryPlanExecutor;
101111
import org.opensearch.sql.plugin.transport.PPLQueryAction;
102112
import org.opensearch.sql.plugin.transport.TransportPPLQueryAction;
103113
import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse;
104114
import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory;
115+
import org.opensearch.sql.protocol.response.format.JsonResponseFormatter;
116+
import org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style;
105117
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
106118
import org.opensearch.sql.spark.cluster.ClusterManagerEventListener;
107119
import org.opensearch.sql.spark.flint.FlintIndexMetadataServiceImpl;
@@ -117,6 +129,7 @@
117129
import org.opensearch.sql.spark.transport.model.CancelAsyncQueryActionResponse;
118130
import org.opensearch.sql.spark.transport.model.CreateAsyncQueryActionResponse;
119131
import org.opensearch.sql.spark.transport.model.GetAsyncQueryResultActionResponse;
132+
import org.opensearch.sql.sql.domain.SQLQueryRequest;
120133
import org.opensearch.sql.storage.DataSourceFactory;
121134
import org.opensearch.threadpool.ExecutorBuilder;
122135
import org.opensearch.threadpool.FixedExecutorBuilder;
@@ -165,7 +178,7 @@ public List<RestHandler> getRestHandlers(
165178
return Arrays.asList(
166179
new RestPPLQueryAction(),
167180
new RestPPLGrammarAction(),
168-
new RestSqlAction(settings, injector),
181+
new RestSqlAction(settings, injector, createSqlAnalyticsRouter()),
169182
new RestSqlStatsAction(settings, restController),
170183
new RestPPLStatsAction(settings, restController),
171184
new RestQuerySettingsAction(settings, restController),
@@ -175,6 +188,67 @@ public List<RestHandler> getRestHandlers(
175188
new RestDirectQueryResourcesManagementAction((OpenSearchSettings) pluginSettings));
176189
}
177190

191+
/**
192+
* Creates a routing function for SQL queries targeting analytics engine indices. Returns {@code
193+
* true} if the query was handled (analytics index), {@code false} to fall through to normal SQL.
194+
*/
195+
private BiFunction<SQLQueryRequest, RestChannel, Boolean> createSqlAnalyticsRouter() {
196+
RestUnifiedQueryAction unifiedQueryHandler =
197+
new RestUnifiedQueryAction(client, clusterService, new StubQueryPlanExecutor());
198+
return (sqlRequest, channel) -> {
199+
if (!unifiedQueryHandler.isAnalyticsIndex(sqlRequest.getQuery(), QueryType.SQL)) {
200+
return false;
201+
}
202+
if (sqlRequest.isExplainRequest()) {
203+
unifiedQueryHandler.explainSql(
204+
sqlRequest.getQuery(),
205+
QueryType.SQL,
206+
new ResponseListener<>() {
207+
@Override
208+
public void onResponse(ExplainResponse response) {
209+
JsonResponseFormatter<ExplainResponse> formatter =
210+
new JsonResponseFormatter<>(Style.PRETTY) {
211+
@Override
212+
protected Object buildJsonObject(ExplainResponse resp) {
213+
return resp;
214+
}
215+
};
216+
channel.sendResponse(
217+
new BytesRestResponse(
218+
RestStatus.OK,
219+
"application/json; charset=UTF-8",
220+
formatter.format(response)));
221+
}
222+
223+
@Override
224+
public void onFailure(Exception e) {
225+
channel.sendResponse(
226+
new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
227+
}
228+
});
229+
} else {
230+
unifiedQueryHandler.executeSql(
231+
sqlRequest.getQuery(),
232+
QueryType.SQL,
233+
new ActionListener<>() {
234+
@Override
235+
public void onResponse(TransportPPLQueryResponse response) {
236+
channel.sendResponse(
237+
new BytesRestResponse(
238+
RestStatus.OK, "application/json; charset=UTF-8", response.getResult()));
239+
}
240+
241+
@Override
242+
public void onFailure(Exception e) {
243+
channel.sendResponse(
244+
new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
245+
}
246+
});
247+
}
248+
return true;
249+
};
250+
}
251+
178252
/** Register action and handler so that transportClient can find proxy for action. */
179253
@Override
180254
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {

0 commit comments

Comments
 (0)