Skip to content

Commit f0bcbab

Browse files
authored
feat(api): Add profiling support to unified query API (#5268)
Add query profiling infrastructure that measures time spent in each query phase (analyze, optimize, execute, format). Profiling is opt-in via UnifiedQueryContext.builder().profiling(true) and uses thread-local context to avoid passing profiling state through every method. Key changes: - QueryProfiling/ProfileContext for thread-local profiling lifecycle - UnifiedQueryContext.measure() API for timing arbitrary phases - Auto-profiling in UnifiedQueryPlanner (analyze) and compiler (optimize) - UnifiedQueryTestBase shared test fixture for unified query tests - Comprehensive profiling tests with non-flaky >= 0 timing assertions Signed-off-by: Chen Dai <daichen@amazon.com>
1 parent 5f304f7 commit f0bcbab

6 files changed

Lines changed: 250 additions & 26 deletions

File tree

api/README.md

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,59 @@ try (UnifiedQueryContext context = UnifiedQueryContext.builder()
179179
}
180180
```
181181

182+
## Profiling
183+
184+
The unified query API supports the same [profiling capability](../docs/user/ppl/interfaces/endpoint.md#profile-experimental) as the PPL REST endpoint. When enabled, each unified query component automatically collects per-phase timing metrics. For code outside unified query components (e.g., `PreparedStatement.executeQuery()` or response formatting), `context.measure()` records custom phases into the same profile.
185+
186+
```java
187+
try (UnifiedQueryContext context = UnifiedQueryContext.builder()
188+
.language(QueryType.PPL)
189+
.catalog("catalog", schema)
190+
.defaultNamespace("catalog")
191+
.profiling(true)
192+
.build()) {
193+
194+
// Auto-profiled: ANALYZE
195+
RelNode plan = new UnifiedQueryPlanner(context).plan(query);
196+
197+
// Auto-profiled: OPTIMIZE
198+
PreparedStatement stmt = new UnifiedQueryCompiler(context).compile(plan);
199+
200+
// User-profiled via measure()
201+
ResultSet rs = context.measure(MetricName.EXECUTE, stmt::executeQuery);
202+
String json = context.measure(MetricName.FORMAT, () -> formatter.format(result));
203+
204+
// Retrieve profile snapshot
205+
QueryProfile profile = context.getProfile();
206+
}
207+
```
208+
209+
The returned `QueryProfile` follows the same JSON structure as the REST API:
210+
211+
```json
212+
{
213+
"summary": {
214+
"total_time_ms": 33.34
215+
},
216+
"phases": {
217+
"analyze": { "time_ms": 8.68 },
218+
"optimize": { "time_ms": 18.2 },
219+
"execute": { "time_ms": 4.87 },
220+
"format": { "time_ms": 0.05 }
221+
},
222+
"plan": {
223+
"node": "EnumerableCalc",
224+
"time_ms": 4.82,
225+
"rows": 2,
226+
"children": [
227+
{ "node": "CalciteEnumerableIndexScan", "time_ms": 4.12, "rows": 2 }
228+
]
229+
}
230+
}
231+
```
232+
233+
When profiling is disabled (the default), all components execute with zero overhead.
234+
182235
## Development & Testing
183236

184237
A set of unit tests is provided to validate planner behavior.

api/src/main/java/org/opensearch/sql/api/UnifiedQueryContext.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
import java.util.List;
1414
import java.util.Map;
1515
import java.util.Objects;
16+
import java.util.Optional;
17+
import java.util.concurrent.Callable;
1618
import lombok.Value;
1719
import org.apache.calcite.avatica.util.Casing;
1820
import org.apache.calcite.jdbc.CalciteSchema;
@@ -28,6 +30,10 @@
2830
import org.opensearch.sql.calcite.SysLimit;
2931
import org.opensearch.sql.common.setting.Settings;
3032
import org.opensearch.sql.executor.QueryType;
33+
import org.opensearch.sql.monitor.profile.MetricName;
34+
import org.opensearch.sql.monitor.profile.ProfileMetric;
35+
import org.opensearch.sql.monitor.profile.QueryProfile;
36+
import org.opensearch.sql.monitor.profile.QueryProfiling;
3137

3238
/**
3339
* A reusable abstraction shared across unified query components (planner, compiler, etc.). This
@@ -43,13 +49,43 @@ public class UnifiedQueryContext implements AutoCloseable {
4349
/** Settings containing execution limits and feature flags used by parsers and planners. */
4450
Settings settings;
4551

52+
/**
53+
* Returns the profiling result. Call after query execution to retrieve collected metrics. Returns
54+
* empty if profiling was not enabled.
55+
*/
56+
public Optional<QueryProfile> getProfile() {
57+
return Optional.ofNullable(QueryProfiling.current().finish());
58+
}
59+
60+
/**
61+
* Measures the execution time of the given action and records it as a profiling metric. When
62+
* profiling is disabled, the action executes with no overhead. Use this for phases outside
63+
* unified query components (e.g., execution, formatting).
64+
*
65+
* @param <T> the return type of the action
66+
* @param metricName the metric to record
67+
* @param action the action to measure
68+
* @return the result of the action
69+
* @throws Exception if the action throws
70+
*/
71+
public <T> T measure(MetricName metricName, Callable<T> action) throws Exception {
72+
ProfileMetric metric = QueryProfiling.current().getOrCreateMetric(metricName);
73+
long start = System.nanoTime();
74+
try {
75+
return action.call();
76+
} finally {
77+
metric.set(System.nanoTime() - start);
78+
}
79+
}
80+
4681
/**
4782
* Closes the underlying resource managed by this context.
4883
*
4984
* @throws Exception if an error occurs while closing the connection
5085
*/
5186
@Override
5287
public void close() throws Exception {
88+
QueryProfiling.clear();
5389
if (planContext != null && planContext.connection != null) {
5490
planContext.connection.close();
5591
}
@@ -66,6 +102,7 @@ public static class Builder {
66102
private final Map<String, Schema> catalogs = new HashMap<>();
67103
private String defaultNamespace;
68104
private boolean cacheMetadata = false;
105+
private boolean profiling = false;
69106

70107
/**
71108
* Setting values with defaults from SysLimit.DEFAULT. Only includes planning-required settings
@@ -125,6 +162,18 @@ public Builder cacheMetadata(boolean cache) {
125162
return this;
126163
}
127164

165+
/**
166+
* Enables or disables query profiling. When enabled, profiling metrics are collected during
167+
* query planning and execution, retrievable via {@link UnifiedQueryContext#getProfile()}.
168+
*
169+
* @param enabled whether to enable profiling
170+
* @return this builder instance
171+
*/
172+
public Builder profiling(boolean enabled) {
173+
this.profiling = enabled;
174+
return this;
175+
}
176+
128177
/**
129178
* Sets a specific setting value by name.
130179
*
@@ -152,6 +201,7 @@ public UnifiedQueryContext build() {
152201
CalcitePlanContext planContext =
153202
CalcitePlanContext.create(
154203
buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);
204+
QueryProfiling.activate(profiling);
155205
return new UnifiedQueryContext(planContext, settings);
156206
}
157207

api/src/main/java/org/opensearch/sql/api/UnifiedQueryPlanner.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
package org.opensearch.sql.api;
77

8+
import static org.opensearch.sql.monitor.profile.MetricName.ANALYZE;
9+
810
import lombok.RequiredArgsConstructor;
911
import org.antlr.v4.runtime.tree.ParseTree;
1012
import org.apache.calcite.rel.RelCollation;
@@ -36,12 +38,16 @@ public class UnifiedQueryPlanner {
3638
/** Planning strategy selected at construction time based on query type. */
3739
private final PlanningStrategy strategy;
3840

41+
/** Unified query context for profiling support. */
42+
private final UnifiedQueryContext context;
43+
3944
/**
4045
* Constructs a UnifiedQueryPlanner with a unified query context.
4146
*
4247
* @param context the unified query context containing CalcitePlanContext
4348
*/
4449
public UnifiedQueryPlanner(UnifiedQueryContext context) {
50+
this.context = context;
4551
this.strategy =
4652
context.getPlanContext().queryType == QueryType.SQL
4753
? new CalciteNativeStrategy(context)
@@ -57,7 +63,7 @@ public UnifiedQueryPlanner(UnifiedQueryContext context) {
5763
*/
5864
public RelNode plan(String query) {
5965
try {
60-
return strategy.plan(query);
66+
return context.measure(ANALYZE, () -> strategy.plan(query));
6167
} catch (SyntaxCheckException e) {
6268
// Re-throw syntax error without wrapping
6369
throw e;

api/src/main/java/org/opensearch/sql/api/compiler/UnifiedQueryCompiler.java

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
package org.opensearch.sql.api.compiler;
77

8+
import static org.opensearch.sql.monitor.profile.MetricName.OPTIMIZE;
9+
810
import java.sql.Connection;
911
import java.sql.PreparedStatement;
1012
import lombok.NonNull;
@@ -46,26 +48,29 @@ public UnifiedQueryCompiler(UnifiedQueryContext context) {
4648
*/
4749
public PreparedStatement compile(@NonNull RelNode plan) {
4850
try {
49-
// Apply shuttle to convert LogicalTableScan to BindableTableScan
50-
final RelHomogeneousShuttle shuttle =
51-
new RelHomogeneousShuttle() {
52-
@Override
53-
public RelNode visit(TableScan scan) {
54-
final RelOptTable table = scan.getTable();
55-
if (scan instanceof LogicalTableScan
56-
&& Bindables.BindableTableScan.canHandle(table)) {
57-
return Bindables.BindableTableScan.create(scan.getCluster(), table);
58-
}
59-
return super.visit(scan);
60-
}
61-
};
62-
RelNode transformedPlan = plan.accept(shuttle);
63-
64-
Connection connection = context.getPlanContext().connection;
65-
final RelRunner runner = connection.unwrap(RelRunner.class);
66-
return runner.prepareStatement(transformedPlan);
51+
return context.measure(OPTIMIZE, () -> doCompile(plan));
6752
} catch (Exception e) {
6853
throw new IllegalStateException("Failed to compile logical plan", e);
6954
}
7055
}
56+
57+
private PreparedStatement doCompile(RelNode plan) throws Exception {
58+
// Apply shuttle to convert LogicalTableScan to BindableTableScan
59+
final RelHomogeneousShuttle shuttle =
60+
new RelHomogeneousShuttle() {
61+
@Override
62+
public RelNode visit(TableScan scan) {
63+
final RelOptTable table = scan.getTable();
64+
if (scan instanceof LogicalTableScan && Bindables.BindableTableScan.canHandle(table)) {
65+
return Bindables.BindableTableScan.create(scan.getCluster(), table);
66+
}
67+
return super.visit(scan);
68+
}
69+
};
70+
RelNode transformedPlan = plan.accept(shuttle);
71+
72+
Connection connection = context.getPlanContext().connection;
73+
final RelRunner runner = connection.unwrap(RelRunner.class);
74+
return runner.prepareStatement(transformedPlan);
75+
}
7176
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.api;
7+
8+
import static org.junit.Assert.assertEquals;
9+
import static org.junit.Assert.assertFalse;
10+
import static org.junit.Assert.assertNotNull;
11+
import static org.junit.Assert.assertThrows;
12+
import static org.junit.Assert.assertTrue;
13+
import static org.opensearch.sql.monitor.profile.MetricName.EXECUTE;
14+
15+
import java.io.IOException;
16+
import java.sql.PreparedStatement;
17+
import java.sql.ResultSet;
18+
import org.apache.calcite.rel.RelNode;
19+
import org.junit.Test;
20+
import org.opensearch.sql.api.compiler.UnifiedQueryCompiler;
21+
import org.opensearch.sql.monitor.profile.QueryProfile;
22+
import org.opensearch.sql.monitor.profile.QueryProfiling;
23+
24+
/** Tests for profiling across unified query components and the measure() API. */
25+
public class UnifiedQueryProfilingTest extends UnifiedQueryTestBase {
26+
27+
@Override
28+
protected UnifiedQueryContext.Builder contextBuilder() {
29+
return super.contextBuilder().profiling(true);
30+
}
31+
32+
@Test
33+
public void testProfilingEnabled() {
34+
assertTrue(QueryProfiling.current().isEnabled());
35+
}
36+
37+
@Test
38+
public void testProfilingDisabledByDefault() throws Exception {
39+
try (UnifiedQueryContext ctx = super.contextBuilder().build()) {
40+
assertFalse(QueryProfiling.current().isEnabled());
41+
}
42+
}
43+
44+
@Test
45+
public void testGetProfileReturnsEmptyWhenDisabled() throws Exception {
46+
try (UnifiedQueryContext ctx = super.contextBuilder().build()) {
47+
assertFalse(ctx.getProfile().isPresent());
48+
}
49+
}
50+
51+
@Test
52+
public void testMeasureExecutesWhenProfilingDisabled() throws Exception {
53+
try (UnifiedQueryContext ctx = super.contextBuilder().build()) {
54+
assertEquals("done", ctx.measure(EXECUTE, () -> "done"));
55+
assertFalse(ctx.getProfile().isPresent());
56+
}
57+
}
58+
59+
@Test
60+
public void testPlannerAutoProfilesAnalyzePhase() {
61+
planner.plan("source = catalog.employees");
62+
assertTrue(context.getProfile().get().getPhases().get("analyze").getTimeMillis() >= 0);
63+
}
64+
65+
@Test
66+
public void testCompilerAutoProfilesOptimizePhase() {
67+
RelNode plan = planner.plan("source = catalog.employees");
68+
new UnifiedQueryCompiler(context).compile(plan);
69+
assertTrue(context.getProfile().get().getPhases().get("optimize").getTimeMillis() >= 0);
70+
}
71+
72+
@Test
73+
public void testMeasureRecordsMetric() throws Exception {
74+
assertEquals("done", context.measure(EXECUTE, () -> "done"));
75+
assertTrue(context.getProfile().get().getPhases().get("execute").getTimeMillis() >= 0);
76+
}
77+
78+
@Test
79+
public void testFullPipelineProfiling() throws Exception {
80+
RelNode plan = planner.plan("source = catalog.employees");
81+
PreparedStatement stmt = new UnifiedQueryCompiler(context).compile(plan);
82+
ResultSet rs = context.measure(EXECUTE, stmt::executeQuery);
83+
84+
QueryProfile profile = context.getProfile().get();
85+
assertTrue(profile.getSummary().getTotalTimeMillis() >= 0);
86+
assertTrue(profile.getPhases().get("analyze").getTimeMillis() >= 0);
87+
assertTrue(profile.getPhases().get("optimize").getTimeMillis() >= 0);
88+
assertTrue(profile.getPhases().get("execute").getTimeMillis() >= 0);
89+
assertNotNull(profile.getPlan());
90+
}
91+
92+
@Test
93+
public void testProfilingClearedAfterClose() throws Exception {
94+
assertTrue(QueryProfiling.current().isEnabled());
95+
context.close();
96+
assertFalse(QueryProfiling.current().isEnabled());
97+
}
98+
99+
@Test
100+
public void testMeasurePropagatesException() {
101+
assertThrows(
102+
IOException.class,
103+
() ->
104+
context.measure(
105+
EXECUTE,
106+
() -> {
107+
throw new IOException("test error");
108+
}));
109+
}
110+
}

api/src/testFixtures/java/org/opensearch/sql/api/UnifiedQueryTestBase.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ protected Map<String, Table> getTableMap() {
5858
}
5959
};
6060

61-
context = buildContext(queryType());
61+
context = contextBuilder().build();
6262
planner = new UnifiedQueryPlanner(context);
6363
}
6464

@@ -69,12 +69,12 @@ protected QueryType queryType() {
6969
return QueryType.PPL;
7070
}
7171

72-
/** Builds a UnifiedQueryContext with the test schema for the given query type. */
73-
protected UnifiedQueryContext buildContext(QueryType queryType) {
74-
return UnifiedQueryContext.builder()
75-
.language(queryType)
76-
.catalog(DEFAULT_CATALOG, testSchema)
77-
.build();
72+
/**
73+
* Creates a pre-configured context builder with test schema. Subclasses can override to customize
74+
* context configuration (e.g., enable profiling).
75+
*/
76+
protected UnifiedQueryContext.Builder contextBuilder() {
77+
return UnifiedQueryContext.builder().language(queryType()).catalog(DEFAULT_CATALOG, testSchema);
7878
}
7979

8080
@After

0 commit comments

Comments
 (0)