Skip to content

Commit a9cdf7b

Browse files
committed
feat(api): Add profiling support to unified query API
Enable profiling in unified query components via UnifiedQueryContext: - profiling(boolean) builder to activate/deactivate profiling - measure(name, callable) API for user-profiled phases (execute, format) - getProfile() to retrieve QueryProfile snapshot after execution - Auto-profiling in UnifiedQueryPlanner (ANALYZE), UnifiedQueryCompiler (OPTIMIZE), and UnifiedQueryTranspiler (TRANSPILE) - All components use context.measure() internally for consistency - NoopProfileContext ensures zero overhead when profiling is disabled Signed-off-by: Chen Dai <daichen@amazon.com>
1 parent cbf538b commit a9cdf7b

7 files changed

Lines changed: 329 additions & 34 deletions

File tree

api/README.md

Lines changed: 59 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,7 @@ RelNode plan2 = planner.plan("source = metrics | stats avg(cpu)");
6060
Use `UnifiedQueryTranspiler` to convert Calcite logical plans into SQL strings for target databases. The transpiler supports various SQL dialects through Calcite's `SqlDialect` interface.
6161

6262
```java
63-
UnifiedQueryTranspiler transpiler = UnifiedQueryTranspiler.builder()
64-
.dialect(SparkSqlDialect.DEFAULT)
65-
.build();
63+
UnifiedQueryTranspiler transpiler = new UnifiedQueryTranspiler(context, SparkSqlDialect.DEFAULT);
6664

6765
String sql = transpiler.toSql(plan);
6866
```
@@ -159,9 +157,7 @@ try (UnifiedQueryContext context = UnifiedQueryContext.builder()
159157
RelNode plan = planner.plan("source = employees | where age > 30");
160158

161159
// Option A: Transpile to target SQL
162-
UnifiedQueryTranspiler transpiler = UnifiedQueryTranspiler.builder()
163-
.dialect(SparkSqlDialect.DEFAULT)
164-
.build();
160+
UnifiedQueryTranspiler transpiler = new UnifiedQueryTranspiler(context, SparkSqlDialect.DEFAULT);
165161
String sparkSql = transpiler.toSql(plan);
166162
// Result: SELECT * FROM `catalog`.`employees` WHERE `age` > 30
167163

@@ -176,6 +172,63 @@ try (UnifiedQueryContext context = UnifiedQueryContext.builder()
176172
}
177173
```
178174

175+
## Profiling
176+
177+
The unified query API supports the same [profiling capability](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/interfaces/endpoint.md#profile-experimental) as the PPL REST endpoint. When enabled, each unified query component automatically collects per-phase timing metrics. For code outside unified query components (e.g., `PreparedStatement.executeQuery()` or response formatting), `context.measure()` records custom phases into the same profile.
178+
179+
```java
180+
try (UnifiedQueryContext context = UnifiedQueryContext.builder()
181+
.language(QueryType.PPL)
182+
.catalog("catalog", schema)
183+
.defaultNamespace("catalog")
184+
.profiling(true)
185+
.build()) {
186+
187+
// Auto-profiled: ANALYZE
188+
RelNode plan = new UnifiedQueryPlanner(context).plan(query);
189+
190+
// Auto-profiled: OPTIMIZE
191+
PreparedStatement stmt = new UnifiedQueryCompiler(context).compile(plan);
192+
193+
// Auto-profiled: TRANSPILE
194+
String sql = new UnifiedQueryTranspiler(context, SparkSqlDialect.DEFAULT).toSql(plan);
195+
196+
// User-profiled via measure()
197+
ResultSet rs = context.measure(MetricName.EXECUTE, stmt::executeQuery);
198+
String json = context.measure(MetricName.FORMAT, () -> formatter.format(result));
199+
200+
// Retrieve profile snapshot
201+
QueryProfile profile = context.getProfile();
202+
}
203+
```
204+
205+
The returned `QueryProfile` follows the same JSON structure as the REST API:
206+
207+
```json
208+
{
209+
"summary": {
210+
"total_time_ms": 33.34
211+
},
212+
"phases": {
213+
"analyze": { "time_ms": 8.68 },
214+
"optimize": { "time_ms": 18.2 },
215+
"execute": { "time_ms": 4.87 },
216+
"format": { "time_ms": 0.05 },
217+
"transpile": { "time_ms": 0.82 }
218+
},
219+
"plan": {
220+
"node": "EnumerableCalc",
221+
"time_ms": 4.82,
222+
"rows": 2,
223+
"children": [
224+
{ "node": "CalciteEnumerableIndexScan", "time_ms": 4.12, "rows": 2 }
225+
]
226+
}
227+
}
228+
```
229+
230+
When profiling is disabled (the default), all components execute with zero overhead.
231+
179232
## Development & Testing
180233

181234
A set of unit tests is provided to validate planner behavior.

api/src/main/java/org/opensearch/sql/api/UnifiedQueryContext.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.util.List;
1414
import java.util.Map;
1515
import java.util.Objects;
16+
import java.util.concurrent.Callable;
1617
import lombok.Value;
1718
import org.apache.calcite.jdbc.CalciteSchema;
1819
import org.apache.calcite.plan.RelTraitDef;
@@ -27,6 +28,11 @@
2728
import org.opensearch.sql.calcite.SysLimit;
2829
import org.opensearch.sql.common.setting.Settings;
2930
import org.opensearch.sql.executor.QueryType;
31+
import org.opensearch.sql.monitor.profile.MetricName;
32+
import org.opensearch.sql.monitor.profile.ProfileContext;
33+
import org.opensearch.sql.monitor.profile.ProfileMetric;
34+
import org.opensearch.sql.monitor.profile.QueryProfile;
35+
import org.opensearch.sql.monitor.profile.QueryProfiling;
3036

3137
/**
3238
* A reusable abstraction shared across unified query components (planner, compiler, etc.). This
@@ -42,13 +48,46 @@ public class UnifiedQueryContext implements AutoCloseable {
4248
/** Settings containing execution limits and feature flags used by parsers and planners. */
4349
Settings settings;
4450

51+
/**
52+
* Returns the profiling result. Call after query execution to retrieve collected metrics. Returns
53+
* null if profiling was not enabled.
54+
*/
55+
public QueryProfile getProfile() {
56+
ProfileContext ctx = QueryProfiling.current();
57+
return ctx.isEnabled() ? ctx.finish() : null;
58+
}
59+
60+
/**
61+
* Measures the execution time of the given action and records it as a profiling metric. When
62+
* profiling is disabled, the action executes with no overhead. Use this for phases outside
63+
* unified query components (e.g., execution, formatting).
64+
*
65+
* <p>Inspired by Micrometer's {@code Timer.record()} and Dropwizard's {@code Timer.time()}.
66+
*
67+
* @param <T> the return type of the action
68+
* @param metricName the metric to record
69+
* @param action the action to measure
70+
* @return the result of the action
71+
* @throws Exception if the action throws
72+
*/
73+
public <T> T measure(MetricName metricName, Callable<T> action) throws Exception {
74+
ProfileMetric metric = QueryProfiling.current().getOrCreateMetric(metricName);
75+
long start = System.nanoTime();
76+
try {
77+
return action.call();
78+
} finally {
79+
metric.set(System.nanoTime() - start);
80+
}
81+
}
82+
4583
/**
4684
* Closes the underlying resource managed by this context.
4785
*
4886
* @throws Exception if an error occurs while closing the connection
4987
*/
5088
@Override
5189
public void close() throws Exception {
90+
QueryProfiling.clear();
5291
if (planContext != null && planContext.connection != null) {
5392
planContext.connection.close();
5493
}
@@ -65,6 +104,7 @@ public static class Builder {
65104
private final Map<String, Schema> catalogs = new HashMap<>();
66105
private String defaultNamespace;
67106
private boolean cacheMetadata = false;
107+
private boolean profiling = false;
68108

69109
/**
70110
* Setting values with defaults from SysLimit.DEFAULT. Only includes planning-required settings
@@ -124,6 +164,18 @@ public Builder cacheMetadata(boolean cache) {
124164
return this;
125165
}
126166

167+
/**
168+
* Enables or disables query profiling. When enabled, profiling metrics are collected during
169+
* query planning and execution, retrievable via {@link UnifiedQueryContext#getProfile()}.
170+
*
171+
* @param enabled whether to enable profiling
172+
* @return this builder instance
173+
*/
174+
public Builder profiling(boolean enabled) {
175+
this.profiling = enabled;
176+
return this;
177+
}
178+
127179
/**
128180
* Sets a specific setting value by name.
129181
*
@@ -151,6 +203,7 @@ public UnifiedQueryContext build() {
151203
CalcitePlanContext planContext =
152204
CalcitePlanContext.create(
153205
buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);
206+
QueryProfiling.activate(profiling);
154207
return new UnifiedQueryContext(planContext, settings);
155208
}
156209

api/src/main/java/org/opensearch/sql/api/UnifiedQueryPlanner.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
package org.opensearch.sql.api;
77

8+
import static org.opensearch.sql.monitor.profile.MetricName.ANALYZE;
9+
810
import org.antlr.v4.runtime.tree.ParseTree;
911
import org.apache.calcite.rel.RelCollation;
1012
import org.apache.calcite.rel.RelCollations;
@@ -57,9 +59,8 @@ public UnifiedQueryPlanner(UnifiedQueryContext context) {
5759
*/
5860
public RelNode plan(String query) {
5961
try {
60-
return preserveCollation(analyze(parse(query)));
62+
return context.measure(ANALYZE, () -> preserveCollation(analyze(parse(query))));
6163
} catch (SyntaxCheckException e) {
62-
// Re-throw syntax error without wrapping
6364
throw e;
6465
} catch (Exception e) {
6566
throw new IllegalStateException("Failed to plan query", e);

api/src/main/java/org/opensearch/sql/api/compiler/UnifiedQueryCompiler.java

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@
55

66
package org.opensearch.sql.api.compiler;
77

8+
import static org.opensearch.sql.monitor.profile.MetricName.OPTIMIZE;
9+
810
import java.sql.Connection;
911
import java.sql.PreparedStatement;
10-
import lombok.NonNull;
1112
import org.apache.calcite.interpreter.Bindables;
1213
import org.apache.calcite.plan.RelOptTable;
1314
import org.apache.calcite.rel.RelHomogeneousShuttle;
@@ -44,28 +45,31 @@ public UnifiedQueryCompiler(UnifiedQueryContext context) {
4445
* @return a compiled PreparedStatement ready for execution
4546
* @throws IllegalStateException if compilation fails
4647
*/
47-
public PreparedStatement compile(@NonNull RelNode plan) {
48+
public PreparedStatement compile(RelNode plan) {
4849
try {
49-
// Apply shuttle to convert LogicalTableScan to BindableTableScan
50-
final RelHomogeneousShuttle shuttle =
51-
new RelHomogeneousShuttle() {
52-
@Override
53-
public RelNode visit(TableScan scan) {
54-
final RelOptTable table = scan.getTable();
55-
if (scan instanceof LogicalTableScan
56-
&& Bindables.BindableTableScan.canHandle(table)) {
57-
return Bindables.BindableTableScan.create(scan.getCluster(), table);
58-
}
59-
return super.visit(scan);
60-
}
61-
};
62-
RelNode transformedPlan = plan.accept(shuttle);
63-
64-
Connection connection = context.getPlanContext().connection;
65-
final RelRunner runner = connection.unwrap(RelRunner.class);
66-
return runner.prepareStatement(transformedPlan);
50+
return context.measure(OPTIMIZE, () -> doCompile(plan));
6751
} catch (Exception e) {
6852
throw new IllegalStateException("Failed to compile logical plan", e);
6953
}
7054
}
55+
56+
private PreparedStatement doCompile(RelNode plan) throws Exception {
57+
// Apply shuttle to convert LogicalTableScan to BindableTableScan
58+
final RelHomogeneousShuttle shuttle =
59+
new RelHomogeneousShuttle() {
60+
@Override
61+
public RelNode visit(TableScan scan) {
62+
final RelOptTable table = scan.getTable();
63+
if (scan instanceof LogicalTableScan && Bindables.BindableTableScan.canHandle(table)) {
64+
return Bindables.BindableTableScan.create(scan.getCluster(), table);
65+
}
66+
return super.visit(scan);
67+
}
68+
};
69+
RelNode transformedPlan = plan.accept(shuttle);
70+
71+
Connection connection = context.getPlanContext().connection;
72+
final RelRunner runner = connection.unwrap(RelRunner.class);
73+
return runner.prepareStatement(transformedPlan);
74+
}
7175
}

api/src/main/java/org/opensearch/sql/api/transpiler/UnifiedQueryTranspiler.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,40 @@
55

66
package org.opensearch.sql.api.transpiler;
77

8+
import static org.opensearch.sql.monitor.profile.MetricName.TRANSPILE;
9+
810
import lombok.Builder;
911
import org.apache.calcite.rel.RelNode;
1012
import org.apache.calcite.rel.rel2sql.RelToSqlConverter;
1113
import org.apache.calcite.sql.SqlDialect;
1214
import org.apache.calcite.sql.SqlNode;
15+
import org.opensearch.sql.api.UnifiedQueryContext;
1316

1417
/**
1518
* Transpiles Calcite logical plans ({@link RelNode}) into SQL strings for various target databases.
1619
* Uses Calcite's {@link RelToSqlConverter} to perform the conversion, respecting the specified SQL
1720
* dialect.
1821
*/
19-
@Builder
2022
public class UnifiedQueryTranspiler {
2123

2224
/** Target SQL dialect */
2325
private final SqlDialect dialect;
2426

27+
/** Unified query context for profiling support. */
28+
private final UnifiedQueryContext context;
29+
30+
/**
31+
* Constructs a UnifiedQueryTranspiler with a unified query context and target dialect.
32+
*
33+
* @param context the unified query context
34+
* @param dialect the target SQL dialect
35+
*/
36+
@Builder
37+
public UnifiedQueryTranspiler(UnifiedQueryContext context, SqlDialect dialect) {
38+
this.context = context;
39+
this.dialect = dialect;
40+
}
41+
2542
/**
2643
* Converts a Calcite logical plan to a SQL string using the configured target dialect.
2744
*
@@ -30,11 +47,15 @@ public class UnifiedQueryTranspiler {
3047
*/
3148
public String toSql(RelNode plan) {
3249
try {
33-
RelToSqlConverter converter = new RelToSqlConverter(dialect);
34-
SqlNode sqlNode = converter.visitRoot(plan).asStatement();
35-
return sqlNode.toSqlString(dialect).getSql();
50+
return context.measure(TRANSPILE, () -> doTranspile(plan));
3651
} catch (Exception e) {
3752
throw new IllegalStateException("Failed to transpile logical plan to SQL", e);
3853
}
3954
}
55+
56+
private String doTranspile(RelNode plan) {
57+
RelToSqlConverter converter = new RelToSqlConverter(dialect);
58+
SqlNode sqlNode = converter.visitRoot(plan).asStatement();
59+
return sqlNode.toSqlString(dialect).getSql();
60+
}
4061
}

0 commit comments

Comments
 (0)