Skip to content

Commit 81edddd

Browse files
authored
Support SQL query profiling with the analytics engine (opensearch-project#5475)
* feat(sql): add profile support to analytics path Add a `profile` boolean field to SQLQueryRequest and thread it through the SQL REST handler to the analytics router. Mirrors PPL's existing profile pattern: parsed from the JSON body's `profile` key, gated by the same rules (only honored for non-explain JDBC requests on the analytics engine path). Replaces the hardcoded `false` in SQLPlugin#createSqlAnalyticsRouter so analytics-engine queries now honor the request's profile flag. The V2 SQL engine path is unchanged; profile is silently ignored when the request does not route to the analytics engine. Refs: opensearch-project#5317 Signed-off-by: Chen Dai <daichen@amazon.com> * fix(analytics): preserve profile context across async listener thread QueryPlanExecutor (analytics-framework 3.7+) fires its listener on a worker thread separate from the one that activated QueryProfiling. Without explicit handoff, the listener-side formatter reads an empty profile context, and the synchronous UnifiedQueryContext.close() via try-with-resources fires before async work finishes — closing the Calcite connection while it is still in use and clearing profile state before the listener can read it. - AnalyticsExecutionEngine.execute: capture the active ProfileContext and restore it via QueryProfiling.withCurrentContext inside the response callback so metric reads/writes see the same context. - RestUnifiedQueryAction.execute: defer UnifiedQueryContext.close to listener completion via ActionListener.runAfter, wrapped in a named wrapWithContextClose helper. - Add regression test simulating async listener delivery. Refs: opensearch-project#5317 Signed-off-by: Chen Dai <daichen@amazon.com> --------- Signed-off-by: Chen Dai <daichen@amazon.com>
1 parent d099d3e commit 81edddd

7 files changed

Lines changed: 210 additions & 16 deletions

File tree

core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import org.opensearch.sql.executor.ExecutionEngine;
3434
import org.opensearch.sql.executor.pagination.Cursor;
3535
import org.opensearch.sql.monitor.profile.MetricName;
36-
import org.opensearch.sql.monitor.profile.ProfileMetric;
36+
import org.opensearch.sql.monitor.profile.ProfileContext;
3737
import org.opensearch.sql.monitor.profile.QueryProfiling;
3838
import org.opensearch.sql.planner.physical.PhysicalPlan;
3939

@@ -81,7 +81,7 @@ public void execute(
8181
// to a worker pool and results arrive on the listener. Record the execute metric in the
8282
// listener callback, before delegating to the user-supplied listener, so the metric snapshot
8383
// taken by SimpleJsonResponseFormatter sees the correct value.
84-
ProfileMetric execMetric = QueryProfiling.current().getOrCreateMetric(MetricName.EXECUTE);
84+
ProfileContext profileCtx = QueryProfiling.current();
8585
long execStart = System.nanoTime();
8686

8787
planExecutor.execute(
@@ -90,15 +90,22 @@ public void execute(
9090
new ActionListener<>() {
9191
@Override
9292
public void onResponse(Iterable<Object[]> rows) {
93-
try {
94-
List<RelDataTypeField> fields = plan.getRowType().getFieldList();
95-
List<ExprValue> results = convertRows(rows, fields);
96-
Schema schema = buildSchema(fields);
97-
execMetric.set(System.nanoTime() - execStart);
98-
listener.onResponse(new QueryResponse(schema, results, Cursor.None));
99-
} catch (Exception e) {
100-
listener.onFailure(e);
101-
}
93+
QueryProfiling.withCurrentContext(
94+
profileCtx,
95+
() -> {
96+
try {
97+
List<RelDataTypeField> fields = plan.getRowType().getFieldList();
98+
List<ExprValue> results = convertRows(rows, fields);
99+
Schema schema = buildSchema(fields);
100+
profileCtx
101+
.getOrCreateMetric(MetricName.EXECUTE)
102+
.set(System.nanoTime() - execStart);
103+
listener.onResponse(new QueryResponse(schema, results, Cursor.None));
104+
} catch (Exception e) {
105+
listener.onFailure(e);
106+
}
107+
return null;
108+
});
102109
}
103110

104111
@Override

core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import static org.junit.jupiter.api.Assertions.assertEquals;
99
import static org.junit.jupiter.api.Assertions.assertNotNull;
10+
import static org.junit.jupiter.api.Assertions.assertSame;
1011
import static org.junit.jupiter.api.Assertions.assertTrue;
1112
import static org.mockito.ArgumentMatchers.any;
1213
import static org.mockito.ArgumentMatchers.eq;
@@ -37,6 +38,8 @@
3738
import org.opensearch.sql.data.type.ExprCoreType;
3839
import org.opensearch.sql.executor.ExecutionEngine.ExplainResponse;
3940
import org.opensearch.sql.executor.ExecutionEngine.QueryResponse;
41+
import org.opensearch.sql.monitor.profile.ProfileContext;
42+
import org.opensearch.sql.monitor.profile.QueryProfiling;
4043
import org.opensearch.sql.planner.physical.PhysicalPlan;
4144

4245
class AnalyticsExecutionEngineTest {
@@ -330,6 +333,49 @@ void physicalPlanExplain_callsOnFailure() {
330333
+ errorRef.get().getMessage());
331334
}
332335

336+
@Test
337+
@SuppressWarnings("unchecked")
338+
void executeRelNode_profilePreservedOnAsyncListener() throws Exception {
339+
ProfileContext expected = QueryProfiling.activate(true);
340+
341+
RelNode relNode = mockRelNode("col", SqlTypeName.VARCHAR);
342+
Iterable<Object[]> rows = Collections.singletonList(new Object[] {"value"});
343+
344+
// Fire the executor's listener on a different thread to simulate async dispatch
345+
doAnswer(
346+
inv -> {
347+
ActionListener<Iterable<Object[]>> al = inv.getArgument(2);
348+
Thread t = new Thread(() -> al.onResponse(rows));
349+
t.start();
350+
t.join();
351+
return null;
352+
})
353+
.when(mockExecutor)
354+
.execute(eq(relNode), any(), any(ActionListener.class));
355+
356+
AtomicReference<ProfileContext> seen = new AtomicReference<>();
357+
engine.execute(
358+
relNode,
359+
mockContext,
360+
new ResponseListener<>() {
361+
@Override
362+
public void onResponse(QueryResponse response) {
363+
seen.set(QueryProfiling.current());
364+
}
365+
366+
@Override
367+
public void onFailure(Exception e) {
368+
throw new AssertionError(e);
369+
}
370+
});
371+
372+
try {
373+
assertSame(expected, seen.get(), "Profile context not restored on async listener thread");
374+
} finally {
375+
QueryProfiling.clear();
376+
}
377+
}
378+
333379
// --- helpers ---
334380

335381
private QueryResponse executeAndCapture(RelNode relNode) {

docs/user/interfaces/endpoint.rst

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,58 @@ Result set::
266266
"status": 200
267267
}
268268

269+
Profile [Experimental]
270+
======================
271+
272+
Description
273+
-----------
274+
275+
Profiling captures per-stage timings (in milliseconds) for SQL query
276+
execution. To enable profiling, set ``"profile": true`` in the request
277+
body alongside ``"query"``.
278+
279+
.. note::
280+
The ``profile`` parameter only takes effect when the query runs on
281+
the Analytics Engine. In all other cases the flag is silently ignored.
282+
283+
Profile output is returned only for regular query execution (not
284+
``_explain``) and only with the default ``format=jdbc``.
285+
286+
Example
287+
-------
288+
289+
Request::
290+
291+
POST /_plugins/_sql
292+
{
293+
"query": "SELECT customer_id, SUM(amount) FROM orders GROUP BY customer_id",
294+
"profile": true
295+
}
296+
297+
Expected output (trimmed)::
298+
299+
{
300+
"profile": {
301+
"summary": {
302+
"total_time_ms": 33.34
303+
},
304+
"phases": {
305+
"analyze": { "time_ms": 8.68 },
306+
"optimize": { "time_ms": 18.2 },
307+
"execute": { "time_ms": 4.87 },
308+
"format": { "time_ms": 0.05 }
309+
},
310+
"plan": {
311+
"node": "EnumerableCalc",
312+
"time_ms": 4.82,
313+
"rows": 2,
314+
"children": [
315+
{ "node": "CalciteEnumerableIndexScan", "time_ms": 4.12, "rows": 2 }
316+
]
317+
}
318+
}
319+
}
320+
269321
Fetch Size (PPL) [Experimental]
270322
================================
271323

plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ public void onFailure(Exception e) {
271271
unifiedQueryHandler.execute(
272272
sqlRequest.getQuery(),
273273
QueryType.SQL,
274-
false,
274+
sqlRequest.isProfileEnabled(),
275275
new ActionListener<>() {
276276
@Override
277277
public void onResponse(TransportPPLQueryResponse response) {

plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,15 +118,18 @@ public void execute(
118118
.schedule(
119119
withCurrentContext(
120120
() -> {
121-
try (UnifiedQueryContext context = buildContext(queryType, profiling)) {
121+
UnifiedQueryContext context = buildContext(queryType, profiling);
122+
ActionListener<TransportPPLQueryResponse> closingListener =
123+
wrapWithContextClose(context, listener);
124+
try {
122125
UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context);
123126
RelNode plan = planner.plan(query);
124127
CalcitePlanContext planContext = context.getPlanContext();
125128
plan = addQuerySizeLimit(plan, planContext);
126129
analyticsEngine.execute(
127-
plan, planContext, createQueryListener(queryType, listener));
130+
plan, planContext, createQueryListener(queryType, closingListener));
128131
} catch (Exception e) {
129-
listener.onFailure(e);
132+
closingListener.onFailure(e);
130133
}
131134
}),
132135
new TimeValue(0),
@@ -256,4 +259,17 @@ private static Runnable withCurrentContext(final Runnable task) {
256259
task.run();
257260
};
258261
}
262+
263+
private static ActionListener<TransportPPLQueryResponse> wrapWithContextClose(
264+
UnifiedQueryContext context, ActionListener<TransportPPLQueryResponse> delegate) {
265+
return ActionListener.runAfter(
266+
delegate,
267+
() -> {
268+
try {
269+
context.close();
270+
} catch (Exception e) {
271+
LOG.warn("Failed to close query context", e);
272+
}
273+
});
274+
}
259275
}

sql/src/main/java/org/opensearch/sql/sql/domain/SQLQueryRequest.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@
2626
@RequiredArgsConstructor
2727
public class SQLQueryRequest {
2828
private static final String QUERY_FIELD_CURSOR = "cursor";
29+
private static final String QUERY_FIELD_PROFILE = "profile";
2930
private static final Set<String> SUPPORTED_FIELDS =
30-
Set.of("query", "fetch_size", "parameters", QUERY_FIELD_CURSOR);
31+
Set.of("query", "fetch_size", "parameters", QUERY_FIELD_CURSOR, QUERY_FIELD_PROFILE);
3132
private static final String QUERY_PARAMS_FORMAT = "format";
3233
private static final String QUERY_PARAMS_SANITIZE = "sanitize";
3334
private static final String QUERY_PARAMS_PRETTY = "pretty";
@@ -118,6 +119,14 @@ public boolean isExplainRequest() {
118119
return path.endsWith("/_explain");
119120
}
120121

122+
/** Check if profiling should run for this request. */
123+
public boolean isProfileEnabled() {
124+
return jsonContent != null
125+
&& jsonContent.optBoolean(QUERY_FIELD_PROFILE, false)
126+
&& !isExplainRequest()
127+
&& Format.JDBC.getFormatName().equalsIgnoreCase(format);
128+
}
129+
121130
public boolean isCursorCloseRequest() {
122131
return path.endsWith("/close");
123132
}

sql/src/test/java/org/opensearch/sql/sql/domain/SQLQueryRequestTest.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,70 @@ public void should_support_raw_format() {
284284
assertTrue(csvRequest.isSupported());
285285
}
286286

287+
@Test
288+
public void should_support_query_with_profile_field() {
289+
SQLQueryRequest request =
290+
SQLQueryRequestBuilder.request("SELECT 1")
291+
.jsonContent("{\"query\": \"SELECT 1\", \"profile\": true}")
292+
.build();
293+
assertTrue(request.isSupported());
294+
}
295+
296+
@Test
297+
public void should_disable_profile_when_no_json_content() {
298+
SQLQueryRequest request =
299+
new SQLQueryRequest(null, "SELECT 1", "_plugins/_sql", Map.of(), null);
300+
assertFalse(request.isProfileEnabled());
301+
}
302+
303+
@Test
304+
public void should_disable_profile_when_profile_field_absent() {
305+
SQLQueryRequest request =
306+
new SQLQueryRequest(
307+
new JSONObject("{\"query\": \"SELECT 1\"}"),
308+
"SELECT 1",
309+
"_plugins/_sql",
310+
Map.of(),
311+
null);
312+
assertFalse(request.isProfileEnabled());
313+
}
314+
315+
@Test
316+
public void should_enable_profile_for_jdbc_query() {
317+
SQLQueryRequest request =
318+
new SQLQueryRequest(
319+
new JSONObject("{\"query\": \"SELECT 1\", \"profile\": true}"),
320+
"SELECT 1",
321+
"_plugins/_sql",
322+
Map.of(),
323+
null);
324+
assertTrue(request.isProfileEnabled());
325+
}
326+
327+
@Test
328+
public void should_disable_profile_on_explain_path() {
329+
SQLQueryRequest request =
330+
new SQLQueryRequest(
331+
new JSONObject("{\"query\": \"SELECT 1\", \"profile\": true}"),
332+
"SELECT 1",
333+
"_plugins/_sql/_explain",
334+
Map.of(),
335+
null);
336+
assertFalse(request.isProfileEnabled());
337+
}
338+
339+
@Test
340+
public void should_disable_profile_for_non_jdbc_format() {
341+
SQLQueryRequest request =
342+
new SQLQueryRequest(
343+
new JSONObject("{\"query\": \"SELECT 1\", \"profile\": true}"),
344+
"SELECT 1",
345+
"_plugins/_sql",
346+
Map.of("format", "csv"),
347+
null);
348+
assertFalse(request.isProfileEnabled());
349+
}
350+
287351
/** SQL query request build helper to improve test data setup readability. */
288352
private static class SQLQueryRequestBuilder {
289353
private String jsonContent;

0 commit comments

Comments
 (0)