From f022dd9ac16395074104ef7545376e387e56bd5c Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 27 May 2026 10:46:44 -0700 Subject: [PATCH 1/2] feat(sql): add profile support to analytics path Add a `profile` boolean field to SQLQueryRequest and thread it through the SQL REST handler to the analytics router. Mirrors PPL's existing profile pattern: parsed from the JSON body's `profile` key, gated by the same rules (only honored for non-explain JDBC requests on the analytics engine path). Replaces the hardcoded `false` in SQLPlugin#createSqlAnalyticsRouter so analytics-engine queries now honor the request's profile flag. The V2 SQL engine path is unchanged; profile is silently ignored when the request does not route to the analytics engine. Refs: #5317 Signed-off-by: Chen Dai --- docs/user/interfaces/endpoint.rst | 52 +++++++++++++++ .../org/opensearch/sql/plugin/SQLPlugin.java | 2 +- .../sql/sql/domain/SQLQueryRequest.java | 11 +++- .../sql/sql/domain/SQLQueryRequestTest.java | 64 +++++++++++++++++++ 4 files changed, 127 insertions(+), 2 deletions(-) diff --git a/docs/user/interfaces/endpoint.rst b/docs/user/interfaces/endpoint.rst index 1deab285103..bd5a9637c5b 100644 --- a/docs/user/interfaces/endpoint.rst +++ b/docs/user/interfaces/endpoint.rst @@ -266,6 +266,58 @@ Result set:: "status": 200 } +Profile [Experimental] +====================== + +Description +----------- + +Profiling captures per-stage timings (in milliseconds) for SQL query +execution. To enable profiling, set ``"profile": true`` in the request +body alongside ``"query"``. + +.. note:: + The ``profile`` parameter only takes effect when the query runs on + the Analytics Engine. In all other cases the flag is silently ignored. + + Profile output is returned only for regular query execution (not + ``_explain``) and only with the default ``format=jdbc``. + +Example +------- + +Request:: + + POST /_plugins/_sql + { + "query": "SELECT customer_id, SUM(amount) FROM orders GROUP BY customer_id", + "profile": true + } + +Expected output (trimmed):: + + { + "profile": { + "summary": { + "total_time_ms": 33.34 + }, + "phases": { + "analyze": { "time_ms": 8.68 }, + "optimize": { "time_ms": 18.2 }, + "execute": { "time_ms": 4.87 }, + "format": { "time_ms": 0.05 } + }, + "plan": { + "node": "EnumerableCalc", + "time_ms": 4.82, + "rows": 2, + "children": [ + { "node": "CalciteEnumerableIndexScan", "time_ms": 4.12, "rows": 2 } + ] + } + } + } + Fetch Size (PPL) [Experimental] ================================ diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index 57095c96790..55706a06ed7 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -271,7 +271,7 @@ public void onFailure(Exception e) { unifiedQueryHandler.execute( sqlRequest.getQuery(), QueryType.SQL, - false, + sqlRequest.isProfileEnabled(), new ActionListener<>() { @Override public void onResponse(TransportPPLQueryResponse response) { diff --git a/sql/src/main/java/org/opensearch/sql/sql/domain/SQLQueryRequest.java b/sql/src/main/java/org/opensearch/sql/sql/domain/SQLQueryRequest.java index df456d4d780..456ea212717 100644 --- a/sql/src/main/java/org/opensearch/sql/sql/domain/SQLQueryRequest.java +++ b/sql/src/main/java/org/opensearch/sql/sql/domain/SQLQueryRequest.java @@ -26,8 +26,9 @@ @RequiredArgsConstructor public class SQLQueryRequest { private static final String QUERY_FIELD_CURSOR = "cursor"; + private static final String QUERY_FIELD_PROFILE = "profile"; private static final Set SUPPORTED_FIELDS = - Set.of("query", "fetch_size", "parameters", QUERY_FIELD_CURSOR); + Set.of("query", "fetch_size", "parameters", QUERY_FIELD_CURSOR, QUERY_FIELD_PROFILE); private static final String QUERY_PARAMS_FORMAT = "format"; private static final String QUERY_PARAMS_SANITIZE = "sanitize"; private static final String QUERY_PARAMS_PRETTY = "pretty"; @@ -118,6 +119,14 @@ public boolean isExplainRequest() { return path.endsWith("/_explain"); } + /** Check if profiling should run for this request. */ + public boolean isProfileEnabled() { + return jsonContent != null + && jsonContent.optBoolean(QUERY_FIELD_PROFILE, false) + && !isExplainRequest() + && Format.JDBC.getFormatName().equalsIgnoreCase(format); + } + public boolean isCursorCloseRequest() { return path.endsWith("/close"); } diff --git a/sql/src/test/java/org/opensearch/sql/sql/domain/SQLQueryRequestTest.java b/sql/src/test/java/org/opensearch/sql/sql/domain/SQLQueryRequestTest.java index a8ac6c71c4e..e5f2400e6cb 100644 --- a/sql/src/test/java/org/opensearch/sql/sql/domain/SQLQueryRequestTest.java +++ b/sql/src/test/java/org/opensearch/sql/sql/domain/SQLQueryRequestTest.java @@ -284,6 +284,70 @@ public void should_support_raw_format() { assertTrue(csvRequest.isSupported()); } + @Test + public void should_support_query_with_profile_field() { + SQLQueryRequest request = + SQLQueryRequestBuilder.request("SELECT 1") + .jsonContent("{\"query\": \"SELECT 1\", \"profile\": true}") + .build(); + assertTrue(request.isSupported()); + } + + @Test + public void should_disable_profile_when_no_json_content() { + SQLQueryRequest request = + new SQLQueryRequest(null, "SELECT 1", "_plugins/_sql", Map.of(), null); + assertFalse(request.isProfileEnabled()); + } + + @Test + public void should_disable_profile_when_profile_field_absent() { + SQLQueryRequest request = + new SQLQueryRequest( + new JSONObject("{\"query\": \"SELECT 1\"}"), + "SELECT 1", + "_plugins/_sql", + Map.of(), + null); + assertFalse(request.isProfileEnabled()); + } + + @Test + public void should_enable_profile_for_jdbc_query() { + SQLQueryRequest request = + new SQLQueryRequest( + new JSONObject("{\"query\": \"SELECT 1\", \"profile\": true}"), + "SELECT 1", + "_plugins/_sql", + Map.of(), + null); + assertTrue(request.isProfileEnabled()); + } + + @Test + public void should_disable_profile_on_explain_path() { + SQLQueryRequest request = + new SQLQueryRequest( + new JSONObject("{\"query\": \"SELECT 1\", \"profile\": true}"), + "SELECT 1", + "_plugins/_sql/_explain", + Map.of(), + null); + assertFalse(request.isProfileEnabled()); + } + + @Test + public void should_disable_profile_for_non_jdbc_format() { + SQLQueryRequest request = + new SQLQueryRequest( + new JSONObject("{\"query\": \"SELECT 1\", \"profile\": true}"), + "SELECT 1", + "_plugins/_sql", + Map.of("format", "csv"), + null); + assertFalse(request.isProfileEnabled()); + } + /** SQL query request build helper to improve test data setup readability. */ private static class SQLQueryRequestBuilder { private String jsonContent; From b968d7f71958bd085e8f744e94d1d6489f96c901 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 28 May 2026 12:28:53 -0700 Subject: [PATCH 2/2] fix(analytics): preserve profile context across async listener thread MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit QueryPlanExecutor (analytics-framework 3.7+) fires its listener on a worker thread separate from the one that activated QueryProfiling. Without explicit handoff, the listener-side formatter reads an empty profile context, and the synchronous UnifiedQueryContext.close() via try-with-resources fires before async work finishes — closing the Calcite connection while it is still in use and clearing profile state before the listener can read it. - AnalyticsExecutionEngine.execute: capture the active ProfileContext and restore it via QueryProfiling.withCurrentContext inside the response callback so metric reads/writes see the same context. - RestUnifiedQueryAction.execute: defer UnifiedQueryContext.close to listener completion via ActionListener.runAfter, wrapped in a named wrapWithContextClose helper. - Add regression test simulating async listener delivery. Refs: #5317 Signed-off-by: Chen Dai --- .../analytics/AnalyticsExecutionEngine.java | 29 +++++++----- .../AnalyticsExecutionEngineTest.java | 46 +++++++++++++++++++ .../plugin/rest/RestUnifiedQueryAction.java | 22 +++++++-- 3 files changed, 83 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java b/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java index 1e997b15677..e62dc60a788 100644 --- a/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java +++ b/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java @@ -33,7 +33,7 @@ import org.opensearch.sql.executor.ExecutionEngine; import org.opensearch.sql.executor.pagination.Cursor; import org.opensearch.sql.monitor.profile.MetricName; -import org.opensearch.sql.monitor.profile.ProfileMetric; +import org.opensearch.sql.monitor.profile.ProfileContext; import org.opensearch.sql.monitor.profile.QueryProfiling; import org.opensearch.sql.planner.physical.PhysicalPlan; @@ -81,7 +81,7 @@ public void execute( // to a worker pool and results arrive on the listener. Record the execute metric in the // listener callback, before delegating to the user-supplied listener, so the metric snapshot // taken by SimpleJsonResponseFormatter sees the correct value. - ProfileMetric execMetric = QueryProfiling.current().getOrCreateMetric(MetricName.EXECUTE); + ProfileContext profileCtx = QueryProfiling.current(); long execStart = System.nanoTime(); planExecutor.execute( @@ -90,15 +90,22 @@ public void execute( new ActionListener<>() { @Override public void onResponse(Iterable rows) { - try { - List fields = plan.getRowType().getFieldList(); - List results = convertRows(rows, fields); - Schema schema = buildSchema(fields); - execMetric.set(System.nanoTime() - execStart); - listener.onResponse(new QueryResponse(schema, results, Cursor.None)); - } catch (Exception e) { - listener.onFailure(e); - } + QueryProfiling.withCurrentContext( + profileCtx, + () -> { + try { + List fields = plan.getRowType().getFieldList(); + List results = convertRows(rows, fields); + Schema schema = buildSchema(fields); + profileCtx + .getOrCreateMetric(MetricName.EXECUTE) + .set(System.nanoTime() - execStart); + listener.onResponse(new QueryResponse(schema, results, Cursor.None)); + } catch (Exception e) { + listener.onFailure(e); + } + return null; + }); } @Override diff --git a/core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java b/core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java index 2fa48489df8..e759b128a45 100644 --- a/core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java +++ b/core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java @@ -7,6 +7,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -37,6 +38,8 @@ import org.opensearch.sql.data.type.ExprCoreType; import org.opensearch.sql.executor.ExecutionEngine.ExplainResponse; import org.opensearch.sql.executor.ExecutionEngine.QueryResponse; +import org.opensearch.sql.monitor.profile.ProfileContext; +import org.opensearch.sql.monitor.profile.QueryProfiling; import org.opensearch.sql.planner.physical.PhysicalPlan; class AnalyticsExecutionEngineTest { @@ -330,6 +333,49 @@ void physicalPlanExplain_callsOnFailure() { + errorRef.get().getMessage()); } + @Test + @SuppressWarnings("unchecked") + void executeRelNode_profilePreservedOnAsyncListener() throws Exception { + ProfileContext expected = QueryProfiling.activate(true); + + RelNode relNode = mockRelNode("col", SqlTypeName.VARCHAR); + Iterable rows = Collections.singletonList(new Object[] {"value"}); + + // Fire the executor's listener on a different thread to simulate async dispatch + doAnswer( + inv -> { + ActionListener> al = inv.getArgument(2); + Thread t = new Thread(() -> al.onResponse(rows)); + t.start(); + t.join(); + return null; + }) + .when(mockExecutor) + .execute(eq(relNode), any(), any(ActionListener.class)); + + AtomicReference seen = new AtomicReference<>(); + engine.execute( + relNode, + mockContext, + new ResponseListener<>() { + @Override + public void onResponse(QueryResponse response) { + seen.set(QueryProfiling.current()); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError(e); + } + }); + + try { + assertSame(expected, seen.get(), "Profile context not restored on async listener thread"); + } finally { + QueryProfiling.clear(); + } + } + // --- helpers --- private QueryResponse executeAndCapture(RelNode relNode) { diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java index a9976ab2fc3..0a802393b6c 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java @@ -118,15 +118,18 @@ public void execute( .schedule( withCurrentContext( () -> { - try (UnifiedQueryContext context = buildContext(queryType, profiling)) { + UnifiedQueryContext context = buildContext(queryType, profiling); + ActionListener closingListener = + wrapWithContextClose(context, listener); + try { UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context); RelNode plan = planner.plan(query); CalcitePlanContext planContext = context.getPlanContext(); plan = addQuerySizeLimit(plan, planContext); analyticsEngine.execute( - plan, planContext, createQueryListener(queryType, listener)); + plan, planContext, createQueryListener(queryType, closingListener)); } catch (Exception e) { - listener.onFailure(e); + closingListener.onFailure(e); } }), new TimeValue(0), @@ -256,4 +259,17 @@ private static Runnable withCurrentContext(final Runnable task) { task.run(); }; } + + private static ActionListener wrapWithContextClose( + UnifiedQueryContext context, ActionListener delegate) { + return ActionListener.runAfter( + delegate, + () -> { + try { + context.close(); + } catch (Exception e) { + LOG.warn("Failed to close query context", e); + } + }); + } }