Skip to content

Commit a4ea219

Browse files
committed
feat(api): Add profiling support to unified query API
Enable profiling in unified query components via UnifiedQueryContext: - profiling(boolean) builder to activate/deactivate profiling - measure(name, callable) API for user-profiled phases (execute, format) - getProfile() to retrieve QueryProfile snapshot after execution - Auto-profiling in UnifiedQueryPlanner (ANALYZE), UnifiedQueryCompiler (OPTIMIZE), and UnifiedQueryTranspiler (TRANSPILE) - All components use context.measure() internally for consistency - NoopProfileContext ensures zero overhead when profiling is disabled Signed-off-by: Chen Dai <daichen@amazon.com>
1 parent cbf538b commit a4ea219

7 files changed

Lines changed: 353 additions & 25 deletions

File tree

api/README.md

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ Use `UnifiedQueryTranspiler` to convert Calcite logical plans into SQL strings f
6262
```java
6363
UnifiedQueryTranspiler transpiler = UnifiedQueryTranspiler.builder()
6464
.dialect(SparkSqlDialect.DEFAULT)
65+
.context(context)
6566
.build();
6667

6768
String sql = transpiler.toSql(plan);
@@ -161,6 +162,7 @@ try (UnifiedQueryContext context = UnifiedQueryContext.builder()
161162
// Option A: Transpile to target SQL
162163
UnifiedQueryTranspiler transpiler = UnifiedQueryTranspiler.builder()
163164
.dialect(SparkSqlDialect.DEFAULT)
165+
.context(context)
164166
.build();
165167
String sparkSql = transpiler.toSql(plan);
166168
// Result: SELECT * FROM `catalog`.`employees` WHERE `age` > 30
@@ -176,6 +178,67 @@ try (UnifiedQueryContext context = UnifiedQueryContext.builder()
176178
}
177179
```
178180

181+
## Profiling
182+
183+
The unified query API supports the same [profiling capability](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/interfaces/endpoint.md#profile-experimental) as the PPL REST endpoint. When enabled, each unified query component automatically collects per-phase timing metrics. For code outside unified query components (e.g., `PreparedStatement.executeQuery()` or response formatting), `context.measure()` records custom phases into the same profile.
184+
185+
```java
186+
try (UnifiedQueryContext context = UnifiedQueryContext.builder()
187+
.language(QueryType.PPL)
188+
.catalog("catalog", schema)
189+
.defaultNamespace("catalog")
190+
.profiling(true)
191+
.build()) {
192+
193+
// Auto-profiled: ANALYZE
194+
RelNode plan = new UnifiedQueryPlanner(context).plan(query);
195+
196+
// Auto-profiled: OPTIMIZE
197+
PreparedStatement stmt = new UnifiedQueryCompiler(context).compile(plan);
198+
199+
// Auto-profiled: TRANSPILE
200+
String sql = UnifiedQueryTranspiler.builder()
201+
.dialect(SparkSqlDialect.DEFAULT)
202+
.context(context)
203+
.build()
204+
.toSql(plan);
205+
206+
// User-profiled via measure()
207+
ResultSet rs = context.measure("execute", stmt::executeQuery);
208+
String json = context.measure("format", () -> formatter.format(result));
209+
210+
// Retrieve profile snapshot
211+
QueryProfile profile = context.getProfile();
212+
}
213+
```
214+
215+
The returned `QueryProfile` follows the same JSON structure as the REST API:
216+
217+
```json
218+
{
219+
"summary": {
220+
"total_time_ms": 33.34
221+
},
222+
"phases": {
223+
"analyze": { "time_ms": 8.68 },
224+
"optimize": { "time_ms": 18.2 },
225+
"execute": { "time_ms": 4.87 },
226+
"format": { "time_ms": 0.05 },
227+
"transpile": { "time_ms": 0.82 }
228+
},
229+
"plan": {
230+
"node": "EnumerableCalc",
231+
"time_ms": 4.82,
232+
"rows": 2,
233+
"children": [
234+
{ "node": "CalciteEnumerableIndexScan", "time_ms": 4.12, "rows": 2 }
235+
]
236+
}
237+
}
238+
```
239+
240+
When profiling is disabled (the default), all components execute with zero overhead.
241+
179242
## Development & Testing
180243

181244
A set of unit tests is provided to validate planner behavior.

api/src/main/java/org/opensearch/sql/api/UnifiedQueryContext.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@
1111

1212
import java.util.HashMap;
1313
import java.util.List;
14+
import java.util.Locale;
1415
import java.util.Map;
1516
import java.util.Objects;
17+
import java.util.concurrent.Callable;
1618
import lombok.Value;
1719
import org.apache.calcite.jdbc.CalciteSchema;
1820
import org.apache.calcite.plan.RelTraitDef;
@@ -27,6 +29,11 @@
2729
import org.opensearch.sql.calcite.SysLimit;
2830
import org.opensearch.sql.common.setting.Settings;
2931
import org.opensearch.sql.executor.QueryType;
32+
import org.opensearch.sql.monitor.profile.MetricName;
33+
import org.opensearch.sql.monitor.profile.ProfileContext;
34+
import org.opensearch.sql.monitor.profile.ProfileMetric;
35+
import org.opensearch.sql.monitor.profile.QueryProfile;
36+
import org.opensearch.sql.monitor.profile.QueryProfiling;
3037

3138
/**
3239
* A reusable abstraction shared across unified query components (planner, compiler, etc.). This
@@ -42,13 +49,61 @@ public class UnifiedQueryContext implements AutoCloseable {
4249
/** Settings containing execution limits and feature flags used by parsers and planners. */
4350
Settings settings;
4451

52+
/**
53+
* Returns the profiling result. Call after query execution to retrieve collected metrics. Returns
54+
* null if profiling was not enabled.
55+
*/
56+
public QueryProfile getProfile() {
57+
ProfileContext ctx = QueryProfiling.current();
58+
return ctx.isEnabled() ? ctx.finish() : null;
59+
}
60+
61+
/**
62+
* Measures the execution time of the given action and records it as a profiling metric. When
63+
* profiling is disabled, the action executes with no overhead. Use this for phases outside
64+
* unified query components (e.g., execution, formatting).
65+
*
66+
* <p>Inspired by Micrometer's {@code Timer.record()} and Dropwizard's {@code Timer.time()}.
67+
*
68+
* @param <T> the return type of the action
69+
* @param name the metric name matching a {@link MetricName} value (case-insensitive, e.g.,
70+
* "execute", "format")
71+
* @param action the action to measure
72+
* @return the result of the action
73+
* @throws Exception if the action throws
74+
*/
75+
public <T> T measure(String name, Callable<T> action) throws Exception {
76+
return measure(MetricName.valueOf(name.toUpperCase(Locale.ROOT)), action);
77+
}
78+
79+
/**
80+
* Measures the execution time of the given action for the specified metric. Used internally by
81+
* unified query components for auto-profiling.
82+
*
83+
* @param <T> the return type of the action
84+
* @param metricName the metric to record
85+
* @param action the action to measure
86+
* @return the result of the action
87+
* @throws Exception if the action throws
88+
*/
89+
public <T> T measure(MetricName metricName, Callable<T> action) throws Exception {
90+
ProfileMetric metric = QueryProfiling.current().getOrCreateMetric(metricName);
91+
long start = System.nanoTime();
92+
try {
93+
return action.call();
94+
} finally {
95+
metric.set(System.nanoTime() - start);
96+
}
97+
}
98+
4599
/**
46100
* Closes the underlying resource managed by this context.
47101
*
48102
* @throws Exception if an error occurs while closing the connection
49103
*/
50104
@Override
51105
public void close() throws Exception {
106+
QueryProfiling.clear();
52107
if (planContext != null && planContext.connection != null) {
53108
planContext.connection.close();
54109
}
@@ -65,6 +120,7 @@ public static class Builder {
65120
private final Map<String, Schema> catalogs = new HashMap<>();
66121
private String defaultNamespace;
67122
private boolean cacheMetadata = false;
123+
private boolean profiling = false;
68124

69125
/**
70126
* Setting values with defaults from SysLimit.DEFAULT. Only includes planning-required settings
@@ -124,6 +180,18 @@ public Builder cacheMetadata(boolean cache) {
124180
return this;
125181
}
126182

183+
/**
184+
* Enables or disables query profiling. When enabled, profiling metrics are collected during
185+
* query planning and execution, retrievable via {@link UnifiedQueryContext#getProfile()}.
186+
*
187+
* @param enabled whether to enable profiling
188+
* @return this builder instance
189+
*/
190+
public Builder profiling(boolean enabled) {
191+
this.profiling = enabled;
192+
return this;
193+
}
194+
127195
/**
128196
* Sets a specific setting value by name.
129197
*
@@ -151,6 +219,7 @@ public UnifiedQueryContext build() {
151219
CalcitePlanContext planContext =
152220
CalcitePlanContext.create(
153221
buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);
222+
QueryProfiling.activate(profiling);
154223
return new UnifiedQueryContext(planContext, settings);
155224
}
156225

api/src/main/java/org/opensearch/sql/api/UnifiedQueryPlanner.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.opensearch.sql.common.antlr.Parser;
1919
import org.opensearch.sql.common.antlr.SyntaxCheckException;
2020
import org.opensearch.sql.executor.QueryType;
21+
import org.opensearch.sql.monitor.profile.MetricName;
2122
import org.opensearch.sql.ppl.antlr.PPLSyntaxParser;
2223
import org.opensearch.sql.ppl.parser.AstBuilder;
2324
import org.opensearch.sql.ppl.parser.AstStatementBuilder;
@@ -57,9 +58,8 @@ public UnifiedQueryPlanner(UnifiedQueryContext context) {
5758
*/
5859
public RelNode plan(String query) {
5960
try {
60-
return preserveCollation(analyze(parse(query)));
61+
return context.measure(MetricName.ANALYZE, () -> preserveCollation(analyze(parse(query))));
6162
} catch (SyntaxCheckException e) {
62-
// Re-throw syntax error without wrapping
6363
throw e;
6464
} catch (Exception e) {
6565
throw new IllegalStateException("Failed to plan query", e);

api/src/main/java/org/opensearch/sql/api/compiler/UnifiedQueryCompiler.java

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.apache.calcite.rel.logical.LogicalTableScan;
1717
import org.apache.calcite.tools.RelRunner;
1818
import org.opensearch.sql.api.UnifiedQueryContext;
19+
import org.opensearch.sql.monitor.profile.MetricName;
1920

2021
/**
2122
* {@code UnifiedQueryCompiler} compiles Calcite logical plans ({@link RelNode}) into executable
@@ -46,26 +47,29 @@ public UnifiedQueryCompiler(UnifiedQueryContext context) {
4647
*/
4748
public PreparedStatement compile(@NonNull RelNode plan) {
4849
try {
49-
// Apply shuttle to convert LogicalTableScan to BindableTableScan
50-
final RelHomogeneousShuttle shuttle =
51-
new RelHomogeneousShuttle() {
52-
@Override
53-
public RelNode visit(TableScan scan) {
54-
final RelOptTable table = scan.getTable();
55-
if (scan instanceof LogicalTableScan
56-
&& Bindables.BindableTableScan.canHandle(table)) {
57-
return Bindables.BindableTableScan.create(scan.getCluster(), table);
58-
}
59-
return super.visit(scan);
60-
}
61-
};
62-
RelNode transformedPlan = plan.accept(shuttle);
63-
64-
Connection connection = context.getPlanContext().connection;
65-
final RelRunner runner = connection.unwrap(RelRunner.class);
66-
return runner.prepareStatement(transformedPlan);
50+
return context.measure(MetricName.OPTIMIZE, () -> doCompile(plan));
6751
} catch (Exception e) {
6852
throw new IllegalStateException("Failed to compile logical plan", e);
6953
}
7054
}
55+
56+
private PreparedStatement doCompile(RelNode plan) throws Exception {
57+
// Apply shuttle to convert LogicalTableScan to BindableTableScan
58+
final RelHomogeneousShuttle shuttle =
59+
new RelHomogeneousShuttle() {
60+
@Override
61+
public RelNode visit(TableScan scan) {
62+
final RelOptTable table = scan.getTable();
63+
if (scan instanceof LogicalTableScan && Bindables.BindableTableScan.canHandle(table)) {
64+
return Bindables.BindableTableScan.create(scan.getCluster(), table);
65+
}
66+
return super.visit(scan);
67+
}
68+
};
69+
RelNode transformedPlan = plan.accept(shuttle);
70+
71+
Connection connection = context.getPlanContext().connection;
72+
final RelRunner runner = connection.unwrap(RelRunner.class);
73+
return runner.prepareStatement(transformedPlan);
74+
}
7175
}

api/src/main/java/org/opensearch/sql/api/transpiler/UnifiedQueryTranspiler.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,13 @@
66
package org.opensearch.sql.api.transpiler;
77

88
import lombok.Builder;
9+
import lombok.NonNull;
910
import org.apache.calcite.rel.RelNode;
1011
import org.apache.calcite.rel.rel2sql.RelToSqlConverter;
1112
import org.apache.calcite.sql.SqlDialect;
1213
import org.apache.calcite.sql.SqlNode;
14+
import org.opensearch.sql.api.UnifiedQueryContext;
15+
import org.opensearch.sql.monitor.profile.MetricName;
1316

1417
/**
1518
* Transpiles Calcite logical plans ({@link RelNode}) into SQL strings for various target databases.
@@ -22,6 +25,9 @@ public class UnifiedQueryTranspiler {
2225
/** Target SQL dialect */
2326
private final SqlDialect dialect;
2427

28+
/** Unified query context for profiling support. */
29+
@NonNull private final UnifiedQueryContext context;
30+
2531
/**
2632
* Converts a Calcite logical plan to a SQL string using the configured target dialect.
2733
*
@@ -30,11 +36,15 @@ public class UnifiedQueryTranspiler {
3036
*/
3137
public String toSql(RelNode plan) {
3238
try {
33-
RelToSqlConverter converter = new RelToSqlConverter(dialect);
34-
SqlNode sqlNode = converter.visitRoot(plan).asStatement();
35-
return sqlNode.toSqlString(dialect).getSql();
39+
return context.measure(MetricName.TRANSPILE, () -> doTranspile(plan));
3640
} catch (Exception e) {
3741
throw new IllegalStateException("Failed to transpile logical plan to SQL", e);
3842
}
3943
}
44+
45+
private String doTranspile(RelNode plan) {
46+
RelToSqlConverter converter = new RelToSqlConverter(dialect);
47+
SqlNode sqlNode = converter.visitRoot(plan).asStatement();
48+
return sqlNode.toSqlString(dialect).getSql();
49+
}
4050
}

0 commit comments

Comments
 (0)