Skip to content

Commit 151cffc

Browse files
authored
Enable profiling and migrate to UnifiedQueryParser (#5285)
* [Mustang] Enable profiling and migrate to UnifiedQueryParser (#5247) Task 1: Enable profiling (#5268) - Add .profiling(pplRequest.profile()) to UnifiedQueryContext.builder() in both doExecute and doExplain Task 2: Migrate to UnifiedQueryParser for index extraction (#5274) - Replace StubIndexDetector regex with PPLQueryParser AST-based extraction: parse query, walk AST to find Relation node, extract table name via getTableQualifiedName() - Delete StubIndexDetector - isAnalyticsIndex() is now an instance method (needs PPLQueryParser) - Constructor takes Settings for PPLQueryParser Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com> * Switch to SimpleJsonResponseFormatter for profiling support Switch from JdbcResponseFormatter to SimpleJsonResponseFormatter so profiling data is included in the response when profile=true. The SimpleJsonResponseFormatter calls QueryProfiling.current().finish() to populate the profile field. Update test assertions to match SimpleJsonResponseFormatter type names (PPL_SPEC: INTEGER -> "int", STRING -> "string") and remove status field check (not included by SimpleJsonResponseFormatter). Add integration test verifying profile field appears in response. Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com> * Use context parser for index extraction instead of standalone PPLQueryParser Create UnifiedQueryContext upfront in isAnalyticsIndex() and use context.getParser() for index name extraction. This reuses the context-owned parser which supports both PPL and SQL, making it ready for unified SQL support without code changes. Remove standalone PPLQueryParser field and Settings constructor param. isAnalyticsIndex() now takes QueryType to create the right context. extractIndexName() handles UnresolvedPlan (PPL) with a TODO for SqlNode (SQL) when unified SQL is enabled. Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com> * Use AST visitor for index name extraction Replace manual tree walking (findRelation) with IndexNameExtractor visitor extending AbstractNodeVisitor. The visitor's visitRelation() is called automatically by the AST accept/visitChildren pattern, which handles tree traversal. Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com> * Wrap execute and explain with context.measure() for profiling Wrap analyticsEngine.execute() and analyticsEngine.explain() calls with context.measure(MetricName.EXECUTE, ...) so execution time is captured in the profiling metrics. Planning is auto-profiled by UnifiedQueryPlanner. Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com> * Fix EXECUTE profiling metric by recording inside AnalyticsExecutionEngine Move EXECUTE metric recording into AnalyticsExecutionEngine.execute(), between the actual execution (planExecutor + row conversion) and the listener.onResponse() call. This ensures the metric is written before SimpleJsonResponseFormatter calls QueryProfiling.finish() to snapshot. Previously context.measure() was used in RestUnifiedQueryAction, but finish() was called inside the listener callback (synchronously) before measure()'s finally block could write the metric, resulting in 0ms. Add IT assertion that execute phase time_ms > 0 to catch this bug. Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com> --------- Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com>
1 parent 2920fc5 commit 151cffc

6 files changed

Lines changed: 177 additions & 249 deletions

File tree

core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import org.opensearch.sql.executor.ExecutionContext;
2525
import org.opensearch.sql.executor.ExecutionEngine;
2626
import org.opensearch.sql.executor.pagination.Cursor;
27+
import org.opensearch.sql.monitor.profile.MetricName;
28+
import org.opensearch.sql.monitor.profile.ProfileMetric;
29+
import org.opensearch.sql.monitor.profile.QueryProfiling;
2730
import org.opensearch.sql.planner.physical.PhysicalPlan;
2831

2932
/**
@@ -67,12 +70,20 @@ public void explain(PhysicalPlan plan, ResponseListener<ExplainResponse> listene
6770
public void execute(
6871
RelNode plan, CalcitePlanContext context, ResponseListener<QueryResponse> listener) {
6972
try {
73+
// Record EXECUTE metric before calling listener, because the listener's onResponse
74+
// triggers SimpleJsonResponseFormatter which calls QueryProfiling.finish() to snapshot
75+
// all metrics. The metric must be written before that snapshot.
76+
ProfileMetric execMetric = QueryProfiling.current().getOrCreateMetric(MetricName.EXECUTE);
77+
long execStart = System.nanoTime();
78+
7079
Iterable<Object[]> rows = planExecutor.execute(plan, null);
7180

7281
List<RelDataTypeField> fields = plan.getRowType().getFieldList();
7382
List<ExprValue> results = convertRows(rows, fields);
7483
Schema schema = buildSchema(fields);
7584

85+
execMetric.set(System.nanoTime() - execStart);
86+
7687
listener.onResponse(new QueryResponse(schema, results, Cursor.None));
7788
} catch (Exception e) {
7889
listener.onFailure(e);

integ-test/src/test/java/org/opensearch/sql/ppl/AnalyticsPPLIT.java

Lines changed: 69 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,20 @@
77

88
import static org.opensearch.sql.legacy.TestUtils.getResponseBody;
99
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT;
10+
import static org.opensearch.sql.plugin.rest.RestPPLQueryAction.QUERY_API_ENDPOINT;
1011
import static org.opensearch.sql.util.MatcherUtils.rows;
1112
import static org.opensearch.sql.util.MatcherUtils.schema;
1213
import static org.opensearch.sql.util.MatcherUtils.verifyDataRows;
1314
import static org.opensearch.sql.util.MatcherUtils.verifyNumOfRows;
1415
import static org.opensearch.sql.util.MatcherUtils.verifySchema;
1516

1617
import java.io.IOException;
17-
import org.apache.logging.log4j.LogManager;
18-
import org.apache.logging.log4j.Logger;
18+
import java.util.Locale;
1919
import org.json.JSONObject;
2020
import org.junit.Test;
21+
import org.opensearch.client.Request;
22+
import org.opensearch.client.RequestOptions;
23+
import org.opensearch.client.Response;
2124
import org.opensearch.client.ResponseException;
2225

2326
/**
@@ -35,8 +38,6 @@
3538
*/
3639
public class AnalyticsPPLIT extends PPLIntegTestCase {
3740

38-
private static final Logger LOG = LogManager.getLogger(AnalyticsPPLIT.class);
39-
4041
@Override
4142
protected void init() throws Exception {
4243
// No index loading needed -- stub schema and data are hardcoded
@@ -47,16 +48,13 @@ protected void init() throws Exception {
4748

4849
@Test
4950
public void testBasicQuerySchemaAndData() throws IOException {
50-
String query = "source = opensearch.parquet_logs";
51-
JSONObject result = executeQuery(query);
52-
LOG.info("[testBasicQuerySchemaAndData] query: {}\nresponse: {}", query, result.toString(2));
53-
51+
JSONObject result = executeQuery("source = opensearch.parquet_logs");
5452
verifySchema(
5553
result,
5654
schema("ts", "timestamp"),
57-
schema("status", "integer"),
58-
schema("message", "keyword"),
59-
schema("ip_addr", "keyword"));
55+
schema("status", "int"),
56+
schema("message", "string"),
57+
schema("ip_addr", "string"));
6058
verifyNumOfRows(result, 3);
6159
verifyDataRows(
6260
result,
@@ -67,17 +65,13 @@ public void testBasicQuerySchemaAndData() throws IOException {
6765

6866
@Test
6967
public void testParquetMetricsSchemaAndData() throws IOException {
70-
String query = "source = opensearch.parquet_metrics";
71-
JSONObject result = executeQuery(query);
72-
LOG.info(
73-
"[testParquetMetricsSchemaAndData] query: {}\nresponse: {}", query, result.toString(2));
74-
68+
JSONObject result = executeQuery("source = opensearch.parquet_metrics");
7569
verifySchema(
7670
result,
7771
schema("ts", "timestamp"),
7872
schema("cpu", "double"),
7973
schema("memory", "double"),
80-
schema("host", "keyword"));
74+
schema("host", "string"));
8175
verifyNumOfRows(result, 2);
8276
verifyDataRows(
8377
result,
@@ -89,120 +83,100 @@ public void testParquetMetricsSchemaAndData() throws IOException {
8983

9084
@Test
9185
public void testResponseFormatHasRequiredFields() throws IOException {
92-
String query = "source = opensearch.parquet_logs";
93-
JSONObject result = executeQuery(query);
94-
LOG.info(
95-
"[testResponseFormatHasRequiredFields] query: {}\nresponse: {}", query, result.toString(2));
96-
97-
String msg = "Full response: " + result.toString(2);
98-
assertTrue("Response missing 'schema'. " + msg, result.has("schema"));
99-
assertTrue("Response missing 'datarows'. " + msg, result.has("datarows"));
100-
assertTrue("Response missing 'total'. " + msg, result.has("total"));
101-
assertTrue("Response missing 'size'. " + msg, result.has("size"));
102-
assertTrue("Response missing 'status'. " + msg, result.has("status"));
103-
assertEquals(
104-
"Expected status 200 but got " + result.getInt("status") + ". " + msg,
105-
200,
106-
result.getInt("status"));
86+
JSONObject result = executeQuery("source = opensearch.parquet_logs");
87+
assertTrue("Response missing 'schema'", result.has("schema"));
88+
assertTrue("Response missing 'datarows'", result.has("datarows"));
89+
assertTrue("Response missing 'total'", result.has("total"));
90+
assertTrue("Response missing 'size'", result.has("size"));
10791
}
10892

10993
@Test
11094
public void testTotalAndSizeMatchRowCount() throws IOException {
111-
String query = "source = opensearch.parquet_logs";
112-
JSONObject result = executeQuery(query);
113-
LOG.info("[testTotalAndSizeMatchRowCount] query: {}\nresponse: {}", query, result.toString(2));
114-
95+
JSONObject result = executeQuery("source = opensearch.parquet_logs");
11596
int rowCount = result.getJSONArray("datarows").length();
116-
assertEquals(
117-
String.format(
118-
"total should match row count. rows=%d, total=%d, size=%d. Response: %s",
119-
rowCount, result.getInt("total"), result.getInt("size"), result.toString(2)),
120-
rowCount,
121-
result.getInt("total"));
122-
assertEquals(
123-
String.format(
124-
"size should match row count. rows=%d, size=%d. Response: %s",
125-
rowCount, result.getInt("size"), result.toString(2)),
126-
rowCount,
127-
result.getInt("size"));
97+
assertEquals(rowCount, result.getInt("total"));
98+
assertEquals(rowCount, result.getInt("size"));
12899
}
129100

130101
// --- Projection tests (schema verification -- stub doesn't evaluate projections) ---
131102

132103
@Test
133104
public void testFieldsProjectionChangesSchema() throws IOException {
134-
String query = "source = opensearch.parquet_logs | fields ts, message";
135-
JSONObject result = executeQuery(query);
136-
LOG.info(
137-
"[testFieldsProjectionChangesSchema] query: {}\nresponse: {}", query, result.toString(2));
138-
139-
verifySchema(result, schema("ts", "timestamp"), schema("message", "keyword"));
105+
JSONObject result = executeQuery("source = opensearch.parquet_logs | fields ts, message");
106+
verifySchema(result, schema("ts", "timestamp"), schema("message", "string"));
140107
verifyNumOfRows(result, 3);
141108
}
142109

143110
@Test
144111
public void testSingleFieldProjection() throws IOException {
145-
String query = "source = opensearch.parquet_logs | fields status";
146-
JSONObject result = executeQuery(query);
147-
LOG.info("[testSingleFieldProjection] query: {}\nresponse: {}", query, result.toString(2));
148-
149-
verifySchema(result, schema("status", "integer"));
112+
JSONObject result = executeQuery("source = opensearch.parquet_logs | fields status");
113+
verifySchema(result, schema("status", "int"));
150114
verifyNumOfRows(result, 3);
151115
}
152116

117+
// --- Profiling tests ---
118+
119+
@Test
120+
public void testProfileResponseIncludesProfilingData() throws IOException {
121+
Request request = new Request("POST", QUERY_API_ENDPOINT);
122+
request.setJsonEntity(
123+
String.format(
124+
Locale.ROOT,
125+
"{\n \"query\": \"%s\",\n \"profile\": true\n}",
126+
"source = opensearch.parquet_logs"));
127+
RequestOptions.Builder restOptionsBuilder = RequestOptions.DEFAULT.toBuilder();
128+
restOptionsBuilder.addHeader("Content-Type", "application/json");
129+
request.setOptions(restOptionsBuilder);
130+
Response response = client().performRequest(request);
131+
JSONObject result = new JSONObject(getResponseBody(response, true));
132+
133+
assertTrue("Response should have 'profile' field when profile=true", result.has("profile"));
134+
JSONObject profile = result.getJSONObject("profile");
135+
assertTrue("Profile should have 'phases' field", profile.has("phases"));
136+
JSONObject phases = profile.getJSONObject("phases");
137+
138+
assertTrue("Phases should have 'analyze' field", phases.has("analyze"));
139+
double analyzeTime = phases.getJSONObject("analyze").getDouble("time_ms");
140+
assertTrue(
141+
"Analyze phase should have non-zero time, got " + analyzeTime + "ms", analyzeTime > 0);
142+
143+
assertTrue("Phases should have 'execute' field", phases.has("execute"));
144+
double executeTime = phases.getJSONObject("execute").getDouble("time_ms");
145+
assertTrue(
146+
"Execute phase should have non-zero time, got " + executeTime + "ms", executeTime > 0);
147+
}
148+
153149
// --- Error handling tests ---
154150

155151
@Test
156-
public void testSyntaxErrorReturnsClientError() throws IOException {
157-
String query = "source = opensearch.parquet_logs | invalid_command";
158-
ResponseException e = assertThrows(ResponseException.class, () -> executeQuery(query));
152+
public void testSyntaxErrorReturnsClientError() {
153+
ResponseException e =
154+
assertThrows(
155+
ResponseException.class,
156+
() -> executeQuery("source = opensearch.parquet_logs | invalid_command"));
159157
int statusCode = e.getResponse().getStatusLine().getStatusCode();
160-
String responseBody = getResponseBody(e.getResponse(), true);
161-
LOG.info(
162-
"[testSyntaxErrorReturnsClientError] query: {}\nstatus: {}\nresponse: {}",
163-
query,
164-
statusCode,
165-
responseBody);
166-
167158
assertTrue(
168-
String.format(
169-
"Syntax error should return 4xx, got %d. Response: %s", statusCode, responseBody),
170-
statusCode >= 400 && statusCode < 500);
159+
"Syntax error should return 4xx, got " + statusCode, statusCode >= 400 && statusCode < 500);
171160
}
172161

173162
// --- Regression tests ---
174163

175164
@Test
176165
public void testNonParquetQueryStillWorks() throws IOException {
177166
loadIndex(Index.ACCOUNT);
178-
String query = String.format("source=%s | head 1 | fields firstname", TEST_INDEX_ACCOUNT);
179-
JSONObject result = executeQuery(query);
180-
LOG.info("[testNonParquetQueryStillWorks] query: {}\nresponse: {}", query, result.toString(2));
181-
182-
assertNotNull("Non-parquet query returned null. Query: " + query, result);
167+
JSONObject result =
168+
executeQuery(String.format("source=%s | head 1 | fields firstname", TEST_INDEX_ACCOUNT));
169+
assertNotNull(result);
170+
assertTrue("Non-parquet query should have datarows", result.has("datarows"));
183171
assertTrue(
184-
"Non-parquet query missing 'datarows'. Response: " + result.toString(2),
185-
result.has("datarows"));
186-
int rowCount = result.getJSONArray("datarows").length();
187-
assertTrue(
188-
String.format(
189-
"Non-parquet query returned 0 rows. Expected > 0. Response: %s", result.toString(2)),
190-
rowCount > 0);
172+
"Non-parquet query should return data", result.getJSONArray("datarows").length() > 0);
191173
}
192174

193175
@Test
194176
public void testNonParquetAggregationStillWorks() throws IOException {
195177
loadIndex(Index.ACCOUNT);
196-
String query = String.format("source=%s | stats count()", TEST_INDEX_ACCOUNT);
197-
JSONObject result = executeQuery(query);
198-
LOG.info(
199-
"[testNonParquetAggregationStillWorks] query: {}\nresponse: {}", query, result.toString(2));
200-
201-
int total = result.getInt("total");
202-
assertTrue(
203-
String.format(
204-
"Non-parquet aggregation returned total=%d, expected > 0. Response: %s",
205-
total, result.toString(2)),
206-
total > 0);
178+
JSONObject result =
179+
executeQuery(String.format("source=%s | stats count()", TEST_INDEX_ACCOUNT));
180+
assertTrue("Non-parquet aggregation should work", result.getInt("total") > 0);
207181
}
208182
}

0 commit comments

Comments
 (0)