Skip to content

Commit 62c6a7e

Browse files
ahkcsdai-chen
authored andcommitted
Integrate SQL REST endpoint with analytics engine path (opensearch-project#5317)
* Integrate SQL REST endpoint with analytics engine path Add SQL query routing through the analytics engine for Parquet-backed indices. SQL queries targeting "parquet_*" indices are routed to RestUnifiedQueryAction via the unified query pipeline (Calcite SQL parser -> UnifiedQueryPlanner -> AnalyticsExecutionEngine). Changes: - Add SqlNode table extraction in RestUnifiedQueryAction.extractIndexName() to support SQL query routing (handles SqlSelect -> SqlIdentifier) - Add executeSql() and explainSql() methods in RestUnifiedQueryAction for SQL queries (parallel to existing PPL execute/explain) - Add analytics routing in RestSqlAction via optional BiFunction router that checks isAnalyticsIndex before delegating to SQLService - Wire the router in SQLPlugin.createSqlAnalyticsRouter() - Non-analytics SQL queries fall through to the existing V2 engine - Add AnalyticsSQLIT integration tests: SELECT *, column projection, explain, non-parquet fallback, syntax error handling Resolves: opensearch-project#5248 Signed-off-by: Kai Huang <ahkcs@amazon.com> * Use queryType to branch index extraction instead of instanceof Signed-off-by: Kai Huang <ahkcs@amazon.com> * Use Optional for extractIndexName return type Signed-off-by: Kai Huang <ahkcs@amazon.com> * Unify execute and explain methods for both PPL and SQL paths Signed-off-by: Kai Huang <ahkcs@amazon.com> * Reuse base class explainQuery() in AnalyticsSQLExplainIT Signed-off-by: Kai Huang <ahkcs@amazon.com> * Reuse base class executeQuery() in AnalyticsSQLIT Signed-off-by: Kai Huang <ahkcs@amazon.com> * Remove no-arg constructor and null check for analyticsRouter analyticsRouter is always provided — only one caller exists (SQLPlugin.getRestHandlers). Remove the backward-compatible no-arg constructor and the null check. Signed-off-by: Kai Huang <ahkcs@amazon.com> * Extract V2 engine delegation into delegateToV2Engine method Restore original logging (query anonymization, explain fallback) that was lost when the fallback logic was inlined into the analytics router lambda. Signed-off-by: Kai Huang <ahkcs@amazon.com> * Refactor SQL table extraction to use SqlBasicVisitor pattern Replace instanceof checks with SqlTableNameExtractor visitor, consistent with how PPL uses AbstractNodeVisitor for index name extraction. Signed-off-by: Kai Huang <ahkcs@amazon.com> * Move SqlBasicVisitor to import header Signed-off-by: Kai Huang <ahkcs@amazon.com> --------- Signed-off-by: Kai Huang <ahkcs@amazon.com>
1 parent a5c00dc commit 62c6a7e

6 files changed

Lines changed: 188 additions & 0 deletions

File tree

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.sql;
7+
8+
import static org.opensearch.sql.legacy.TestUtils.isIndexExist;
9+
import static org.opensearch.sql.util.MatcherUtils.assertJsonEqualsIgnoreId;
10+
11+
import com.google.common.io.Resources;
12+
import java.io.IOException;
13+
import java.net.URI;
14+
import java.nio.file.Files;
15+
import java.nio.file.Paths;
16+
import org.junit.Test;
17+
import org.opensearch.client.Request;
18+
import org.opensearch.sql.legacy.SQLIntegTestCase;
19+
20+
/**
21+
* Explain integration tests for SQL queries routed through the analytics engine path (Project
22+
* Analytics engine). Validates that SQL queries targeting "parquet_*" indices produce correct
23+
* logical plans via the _plugins/_sql/_explain endpoint.
24+
*
25+
* <p>Expected output files are in resources/expectedOutput/analytics_sql/. Each test compares the
26+
* explain JSON output against its expected file.
27+
*/
28+
@SuppressWarnings("deprecation") // assertJsonEqualsIgnoreId is correct for JSON explain response
29+
public class AnalyticsSQLExplainIT extends SQLIntegTestCase {
30+
31+
@Override
32+
protected void init() throws Exception {
33+
if (!isIndexExist(client(), "parquet_logs")) {
34+
Request request = new Request("PUT", "/parquet_logs");
35+
request.setJsonEntity(
36+
"{"
37+
+ "\"mappings\": {"
38+
+ " \"properties\": {"
39+
+ " \"ts\": {\"type\": \"date\"},"
40+
+ " \"status\": {\"type\": \"integer\"},"
41+
+ " \"message\": {\"type\": \"keyword\"},"
42+
+ " \"ip_addr\": {\"type\": \"keyword\"}"
43+
+ " }"
44+
+ "}"
45+
+ "}");
46+
client().performRequest(request);
47+
}
48+
}
49+
50+
private static String loadExpectedJson(String fileName) {
51+
return loadFromFile("expectedOutput/analytics_sql/" + fileName);
52+
}
53+
54+
private static String loadFromFile(String filename) {
55+
try {
56+
URI uri = Resources.getResource(filename).toURI();
57+
return new String(Files.readAllBytes(Paths.get(uri)));
58+
} catch (Exception e) {
59+
throw new RuntimeException(e);
60+
}
61+
}
62+
63+
@Test
64+
public void testExplainSelectStar() throws IOException {
65+
assertJsonEqualsIgnoreId(
66+
loadExpectedJson("explain_select_star.json"), explainQuery("SELECT * FROM parquet_logs"));
67+
}
68+
69+
@Test
70+
public void testExplainSelectColumns() throws IOException {
71+
assertJsonEqualsIgnoreId(
72+
loadExpectedJson("explain_select_columns.json"),
73+
explainQuery("SELECT ts, status FROM parquet_logs"));
74+
}
75+
76+
@Test
77+
public void testExplainSelectWithWhere() throws IOException {
78+
assertJsonEqualsIgnoreId(
79+
loadExpectedJson("explain_select_where.json"),
80+
explainQuery("SELECT ts, message FROM parquet_logs WHERE status = 200"));
81+
}
82+
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.sql;
7+
8+
import static org.opensearch.sql.legacy.TestUtils.isIndexExist;
9+
import static org.opensearch.sql.util.MatcherUtils.rows;
10+
import static org.opensearch.sql.util.MatcherUtils.schema;
11+
import static org.opensearch.sql.util.MatcherUtils.verifyDataRows;
12+
import static org.opensearch.sql.util.MatcherUtils.verifySchema;
13+
14+
import java.io.IOException;
15+
import org.json.JSONObject;
16+
import org.junit.Test;
17+
import org.opensearch.client.Request;
18+
import org.opensearch.client.ResponseException;
19+
import org.opensearch.sql.legacy.SQLIntegTestCase;
20+
21+
/**
22+
* Integration tests for SQL queries routed through the analytics engine path. Queries targeting
23+
* "parquet_*" indices are routed to {@code RestUnifiedQueryAction} which uses {@code
24+
* AnalyticsExecutionEngine} with a stub {@code QueryPlanExecutor}.
25+
*
26+
* <p>The stub executor returns rows in a fixed order [ts, status, message, ip_addr] regardless of
27+
* the plan. The schema from OpenSearchSchemaBuilder is alphabetical [ip_addr, message, status, ts].
28+
* AnalyticsExecutionEngine maps values by position, so the data values appear mismatched. This is
29+
* expected; the real analytics engine will evaluate the plan correctly.
30+
*/
31+
public class AnalyticsSQLIT extends SQLIntegTestCase {
32+
33+
@Override
34+
protected void init() throws Exception {
35+
createParquetLogsIndex();
36+
}
37+
38+
private void createParquetLogsIndex() throws IOException {
39+
if (isIndexExist(client(), "parquet_logs")) {
40+
return;
41+
}
42+
Request request = new Request("PUT", "/parquet_logs");
43+
request.setJsonEntity(
44+
"{"
45+
+ "\"mappings\": {"
46+
+ " \"properties\": {"
47+
+ " \"ts\": {\"type\": \"date\"},"
48+
+ " \"status\": {\"type\": \"integer\"},"
49+
+ " \"message\": {\"type\": \"keyword\"},"
50+
+ " \"ip_addr\": {\"type\": \"keyword\"}"
51+
+ " }"
52+
+ "}"
53+
+ "}");
54+
client().performRequest(request);
55+
}
56+
57+
@Test
58+
public void testSelectStarSchemaAndData() throws IOException {
59+
JSONObject result = executeQuery("SELECT * FROM parquet_logs");
60+
verifySchema(
61+
result,
62+
schema("ip_addr", "string"),
63+
schema("message", "string"),
64+
schema("status", "integer"),
65+
schema("ts", "timestamp"));
66+
// Stub returns [ts, status, message, ip_addr] per row, mapped by position to
67+
// [ip_addr, message, status, ts] schema. Values appear mismatched — expected with stub.
68+
verifyDataRows(
69+
result,
70+
rows("2024-01-15 10:30:00", 200, "Request completed", "192.168.1.1"),
71+
rows("2024-01-15 10:31:00", 200, "Health check OK", "192.168.1.2"),
72+
rows("2024-01-15 10:32:00", 500, "Internal server error", "192.168.1.3"));
73+
}
74+
75+
@Test
76+
public void testSelectSpecificColumns() throws IOException {
77+
JSONObject result = executeQuery("SELECT status, message FROM parquet_logs");
78+
verifySchema(result, schema("status", "integer"), schema("message", "string"));
79+
verifyDataRows(
80+
result,
81+
rows("2024-01-15 10:30:00", 200),
82+
rows("2024-01-15 10:31:00", 200),
83+
rows("2024-01-15 10:32:00", 500));
84+
}
85+
86+
@Test(expected = ResponseException.class)
87+
public void testSyntaxError() throws IOException {
88+
executeQuery("SELEC * FROM parquet_logs");
89+
}
90+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"calcite": {
3+
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(ts=[$3], status=[$2])\n LogicalTableScan(table=[[opensearch, parquet_logs]])\n"
4+
}
5+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"calcite": {
3+
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(ip_addr=[$0], message=[$1], status=[$2], ts=[$3])\n LogicalTableScan(table=[[opensearch, parquet_logs]])\n"
4+
}
5+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"calcite": {
3+
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(ts=[$3], message=[$1])\n LogicalFilter(condition=[=($2, 200)])\n LogicalTableScan(table=[[opensearch, parquet_logs]])\n"
4+
}
5+
}

plugin/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,7 @@ def getJobSchedulerPlugin() {
306306

307307
testClusters.integTest {
308308
plugin(getJobSchedulerPlugin())
309+
plugin provider { (RegularFile) (() -> file("${rootDir}/libs/analytics-engine-3.6.0-SNAPSHOT.zip")) }
309310
plugin(project.tasks.bundlePlugin.archiveFile)
310311
testDistribution = "ARCHIVE"
311312

0 commit comments

Comments
 (0)