Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.executor.pagination.Cursor;
import org.opensearch.sql.monitor.profile.MetricName;
import org.opensearch.sql.monitor.profile.ProfileMetric;
import org.opensearch.sql.monitor.profile.ProfileContext;
import org.opensearch.sql.monitor.profile.QueryProfiling;
import org.opensearch.sql.planner.physical.PhysicalPlan;

Expand Down Expand Up @@ -81,7 +81,7 @@ public void execute(
// to a worker pool and results arrive on the listener. Record the execute metric in the
// listener callback, before delegating to the user-supplied listener, so the metric snapshot
// taken by SimpleJsonResponseFormatter sees the correct value.
ProfileMetric execMetric = QueryProfiling.current().getOrCreateMetric(MetricName.EXECUTE);
ProfileContext profileCtx = QueryProfiling.current();
long execStart = System.nanoTime();

planExecutor.execute(
Expand All @@ -90,15 +90,22 @@ public void execute(
new ActionListener<>() {
@Override
public void onResponse(Iterable<Object[]> rows) {
try {
List<RelDataTypeField> fields = plan.getRowType().getFieldList();
List<ExprValue> results = convertRows(rows, fields);
Schema schema = buildSchema(fields);
execMetric.set(System.nanoTime() - execStart);
listener.onResponse(new QueryResponse(schema, results, Cursor.None));
} catch (Exception e) {
listener.onFailure(e);
}
QueryProfiling.withCurrentContext(
profileCtx,
() -> {
try {
List<RelDataTypeField> fields = plan.getRowType().getFieldList();
List<ExprValue> results = convertRows(rows, fields);
Schema schema = buildSchema(fields);
profileCtx
.getOrCreateMetric(MetricName.EXECUTE)
.set(System.nanoTime() - execStart);
listener.onResponse(new QueryResponse(schema, results, Cursor.None));
} catch (Exception e) {
listener.onFailure(e);
}
return null;
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
Expand Down Expand Up @@ -37,6 +38,8 @@
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.executor.ExecutionEngine.ExplainResponse;
import org.opensearch.sql.executor.ExecutionEngine.QueryResponse;
import org.opensearch.sql.monitor.profile.ProfileContext;
import org.opensearch.sql.monitor.profile.QueryProfiling;
import org.opensearch.sql.planner.physical.PhysicalPlan;

class AnalyticsExecutionEngineTest {
Expand Down Expand Up @@ -330,6 +333,49 @@ void physicalPlanExplain_callsOnFailure() {
+ errorRef.get().getMessage());
}

@Test
@SuppressWarnings("unchecked")
void executeRelNode_profilePreservedOnAsyncListener() throws Exception {
ProfileContext expected = QueryProfiling.activate(true);

RelNode relNode = mockRelNode("col", SqlTypeName.VARCHAR);
Iterable<Object[]> rows = Collections.singletonList(new Object[] {"value"});

// Fire the executor's listener on a different thread to simulate async dispatch
doAnswer(
inv -> {
ActionListener<Iterable<Object[]>> al = inv.getArgument(2);
Thread t = new Thread(() -> al.onResponse(rows));
t.start();
t.join();
return null;
})
.when(mockExecutor)
.execute(eq(relNode), any(), any(ActionListener.class));

AtomicReference<ProfileContext> seen = new AtomicReference<>();
engine.execute(
relNode,
mockContext,
new ResponseListener<>() {
@Override
public void onResponse(QueryResponse response) {
seen.set(QueryProfiling.current());
}

@Override
public void onFailure(Exception e) {
throw new AssertionError(e);
}
});

try {
assertSame(expected, seen.get(), "Profile context not restored on async listener thread");
} finally {
QueryProfiling.clear();
}
}

// --- helpers ---

private QueryResponse executeAndCapture(RelNode relNode) {
Expand Down
52 changes: 52 additions & 0 deletions docs/user/interfaces/endpoint.rst
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,58 @@ Result set::
"status": 200
}

Profile [Experimental]
======================

Description
-----------

Profiling captures per-stage timings (in milliseconds) for SQL query
execution. To enable profiling, set ``"profile": true`` in the request
body alongside ``"query"``.

.. note::
The ``profile`` parameter only takes effect when the query runs on
the Analytics Engine. In all other cases the flag is silently ignored.

Profile output is returned only for regular query execution (not
``_explain``) and only with the default ``format=jdbc``.

Example
-------

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this example an integration test? I am wondering if we can have an integration test for profiling(assert each metric to be greater than 0)?

@dai-chen dai-chen May 28, 2026

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did this manually against OS cluster with SQL + AE. I'm also thinking can we add some dedicated IT for AE. Let me check after this.


Request::

POST /_plugins/_sql
{
"query": "SELECT customer_id, SUM(amount) FROM orders GROUP BY customer_id",
"profile": true
}

Expected output (trimmed)::

{
"profile": {
"summary": {
"total_time_ms": 33.34
},
"phases": {
"analyze": { "time_ms": 8.68 },
"optimize": { "time_ms": 18.2 },
"execute": { "time_ms": 4.87 },
"format": { "time_ms": 0.05 }
},
"plan": {
"node": "EnumerableCalc",
"time_ms": 4.82,
"rows": 2,
"children": [
{ "node": "CalciteEnumerableIndexScan", "time_ms": 4.12, "rows": 2 }
]
}
}
}

Fetch Size (PPL) [Experimental]
================================

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public void onFailure(Exception e) {
unifiedQueryHandler.execute(
sqlRequest.getQuery(),
QueryType.SQL,
false,
sqlRequest.isProfileEnabled(),
new ActionListener<>() {
@Override
public void onResponse(TransportPPLQueryResponse response) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,18 @@ public void execute(
.schedule(
withCurrentContext(
() -> {
try (UnifiedQueryContext context = buildContext(queryType, profiling)) {
UnifiedQueryContext context = buildContext(queryType, profiling);
ActionListener<TransportPPLQueryResponse> closingListener =
wrapWithContextClose(context, listener);
try {
UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context);
RelNode plan = planner.plan(query);
CalcitePlanContext planContext = context.getPlanContext();
plan = addQuerySizeLimit(plan, planContext);
analyticsEngine.execute(
plan, planContext, createQueryListener(queryType, listener));
plan, planContext, createQueryListener(queryType, closingListener));
} catch (Exception e) {
listener.onFailure(e);
closingListener.onFailure(e);
}
}),
new TimeValue(0),
Expand Down Expand Up @@ -256,4 +259,17 @@ private static Runnable withCurrentContext(final Runnable task) {
task.run();
};
}

private static ActionListener<TransportPPLQueryResponse> wrapWithContextClose(
UnifiedQueryContext context, ActionListener<TransportPPLQueryResponse> delegate) {
return ActionListener.runAfter(
delegate,
() -> {
try {
context.close();
} catch (Exception e) {
LOG.warn("Failed to close query context", e);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@
@RequiredArgsConstructor
public class SQLQueryRequest {
private static final String QUERY_FIELD_CURSOR = "cursor";
private static final String QUERY_FIELD_PROFILE = "profile";
private static final Set<String> SUPPORTED_FIELDS =
Set.of("query", "fetch_size", "parameters", QUERY_FIELD_CURSOR);
Set.of("query", "fetch_size", "parameters", QUERY_FIELD_CURSOR, QUERY_FIELD_PROFILE);
private static final String QUERY_PARAMS_FORMAT = "format";
private static final String QUERY_PARAMS_SANITIZE = "sanitize";
private static final String QUERY_PARAMS_PRETTY = "pretty";
Expand Down Expand Up @@ -118,6 +119,14 @@ public boolean isExplainRequest() {
return path.endsWith("/_explain");
}

/** Check if profiling should run for this request. */
public boolean isProfileEnabled() {
return jsonContent != null
&& jsonContent.optBoolean(QUERY_FIELD_PROFILE, false)
&& !isExplainRequest()
&& Format.JDBC.getFormatName().equalsIgnoreCase(format);
}

public boolean isCursorCloseRequest() {
return path.endsWith("/close");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,70 @@ public void should_support_raw_format() {
assertTrue(csvRequest.isSupported());
}

@Test
public void should_support_query_with_profile_field() {
SQLQueryRequest request =
SQLQueryRequestBuilder.request("SELECT 1")
.jsonContent("{\"query\": \"SELECT 1\", \"profile\": true}")
.build();
assertTrue(request.isSupported());
}

@Test
public void should_disable_profile_when_no_json_content() {
SQLQueryRequest request =
new SQLQueryRequest(null, "SELECT 1", "_plugins/_sql", Map.of(), null);
assertFalse(request.isProfileEnabled());
}

@Test
public void should_disable_profile_when_profile_field_absent() {
SQLQueryRequest request =
new SQLQueryRequest(
new JSONObject("{\"query\": \"SELECT 1\"}"),
"SELECT 1",
"_plugins/_sql",
Map.of(),
null);
assertFalse(request.isProfileEnabled());
}

@Test
public void should_enable_profile_for_jdbc_query() {
SQLQueryRequest request =
new SQLQueryRequest(
new JSONObject("{\"query\": \"SELECT 1\", \"profile\": true}"),
"SELECT 1",
"_plugins/_sql",
Map.of(),
null);
assertTrue(request.isProfileEnabled());
}

@Test
public void should_disable_profile_on_explain_path() {
SQLQueryRequest request =
new SQLQueryRequest(
new JSONObject("{\"query\": \"SELECT 1\", \"profile\": true}"),
"SELECT 1",
"_plugins/_sql/_explain",
Map.of(),
null);
assertFalse(request.isProfileEnabled());
}

@Test
public void should_disable_profile_for_non_jdbc_format() {
SQLQueryRequest request =
new SQLQueryRequest(
new JSONObject("{\"query\": \"SELECT 1\", \"profile\": true}"),
"SELECT 1",
"_plugins/_sql",
Map.of("format", "csv"),
null);
assertFalse(request.isProfileEnabled());
}

/** SQL query request build helper to improve test data setup readability. */
private static class SQLQueryRequestBuilder {
private String jsonContent;
Expand Down
Loading