Skip to content

Commit 7ed954a

Browse files
authored
Merge analytics-engine profile into SQL-layer profile (#5571)
Signed-off-by: Chen Dai <daichen@amazon.com>
1 parent d249a47 commit 7ed954a

6 files changed

Lines changed: 88 additions & 47 deletions

File tree

core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,9 @@ public void executeWithProfile(
169169
org.opensearch.analytics.QueryRequestContext queryCtx,
170170
ResponseListener<QueryResponse> listener) {
171171

172+
ProfileContext profileCtx = QueryProfiling.current();
173+
long execStart = System.nanoTime();
174+
172175
planExecutor.executeWithProfile(
173176
plan,
174177
queryCtx,
@@ -178,6 +181,7 @@ public void onResponse(ProfiledResult result) {
178181
try {
179182
// ProfiledResult delivers the profile on BOTH success and failure paths
180183
// so users get stage timing visibility even when a query partially fails.
184+
profileCtx.getOrCreateMetric(MetricName.EXECUTE).set(System.nanoTime() - execStart);
181185
QueryResponse response = buildProfiledResponse(plan, result);
182186
listener.onResponse(response);
183187
} catch (Exception e) {

core/src/main/java/org/opensearch/sql/monitor/profile/DefaultProfileContext.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public class DefaultProfileContext implements ProfileContext {
1717
private boolean finished;
1818
private final Map<MetricName, DefaultMetricImpl> metrics = new ConcurrentHashMap<>();
1919
private ProfilePlanNode planRoot;
20+
private Object enginePlan;
2021
private QueryProfile profile;
2122

2223
public DefaultProfileContext() {}
@@ -40,6 +41,11 @@ public synchronized void setPlanRoot(ProfilePlanNode planRoot) {
4041
}
4142
}
4243

44+
@Override
45+
public synchronized void setEnginePlan(Object enginePlan) {
46+
this.enginePlan = enginePlan;
47+
}
48+
4349
/** {@inheritDoc} */
4450
@Override
4551
public synchronized QueryProfile finish() {
@@ -55,7 +61,8 @@ public synchronized QueryProfile finish() {
5561
snapshot.put(metricName, millis);
5662
}
5763
double totalMillis = ProfileUtils.roundToMillis(endNanos - startNanos);
58-
QueryProfile.PlanNode planSnapshot = planRoot == null ? null : planRoot.snapshot();
64+
Object planSnapshot =
65+
enginePlan != null ? enginePlan : (planRoot == null ? null : planRoot.snapshot());
5966
profile = new QueryProfile(totalMillis, snapshot, planSnapshot);
6067
return profile;
6168
}

core/src/main/java/org/opensearch/sql/monitor/profile/ProfileContext.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ public interface ProfileContext {
2727
*/
2828
void setPlanRoot(ProfilePlanNode planRoot);
2929

30+
/** TODO: merge with planRoot into one generic execution-engine-specific plan profile field. */
31+
default void setEnginePlan(Object plan) {}
32+
3033
/**
3134
* Finalize profiling and return a snapshot.
3235
*

core/src/main/java/org/opensearch/sql/monitor/profile/QueryProfile.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ public final class QueryProfile {
2121

2222
private final Map<String, Phase> phases;
2323

24-
private final PlanNode plan;
24+
/** Execution-engine-specific plan profile: a {@link PlanNode} tree, or a pre-rendered object. */
25+
private final Object plan;
2526

2627
/**
2728
* Create a new query profile snapshot.
@@ -40,7 +41,7 @@ public QueryProfile(double totalTimeMillis, Map<MetricName, Double> phases) {
4041
* @param phases metric values keyed by {@link MetricName}
4142
* @param plan plan tree profiling output
4243
*/
43-
public QueryProfile(double totalTimeMillis, Map<MetricName, Double> phases, PlanNode plan) {
44+
public QueryProfile(double totalTimeMillis, Map<MetricName, Double> phases, Object plan) {
4445
this.summary = new Summary(totalTimeMillis);
4546
this.phases = buildPhases(phases);
4647
this.plan = plan;

integ-test/src/test/java/org/opensearch/sql/analytics/AnalyticsEngineProfileIT.java

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,21 @@ public void testPplProfileReturnsStages() throws IOException {
103103
assertTrue("has profile", result.has("profile"));
104104

105105
JSONObject profile = result.getJSONObject("profile");
106-
assertTrue("has query_id", profile.has("query_id"));
107-
assertTrue("has planning_time_ms", profile.has("planning_time_ms"));
108-
assertTrue("has execution_time_ms", profile.has("execution_time_ms"));
109-
assertTrue("has full_plan", profile.has("full_plan"));
110-
111-
JSONArray stages = profile.getJSONArray("stages");
106+
assertTrue("has summary", profile.has("summary"));
107+
assertTrue("summary has total_time_ms", profile.getJSONObject("summary").has("total_time_ms"));
108+
assertTrue("has phases", profile.has("phases"));
109+
JSONObject phases = profile.getJSONObject("phases");
110+
assertTrue("phase analyze", phases.getJSONObject("analyze").has("time_ms"));
111+
assertTrue("phase execute", phases.getJSONObject("execute").has("time_ms"));
112+
assertTrue("phase format", phases.getJSONObject("format").has("time_ms"));
113+
114+
JSONObject plan = profile.getJSONObject("plan");
115+
assertTrue("has query_id", plan.has("query_id"));
116+
assertTrue("has planning_time_ms", plan.has("planning_time_ms"));
117+
assertTrue("has execution_time_ms", plan.has("execution_time_ms"));
118+
assertTrue("has full_plan", plan.has("full_plan"));
119+
120+
JSONArray stages = plan.getJSONArray("stages");
112121
assertTrue("at least one stage", stages.length() >= 1);
113122

114123
JSONObject stage = stages.getJSONObject(0);
@@ -129,9 +138,12 @@ public void testSqlProfileReturnsStages() throws IOException {
129138
assertTrue("has profile", result.has("profile"));
130139

131140
JSONObject profile = result.getJSONObject("profile");
132-
assertTrue("has query_id", profile.has("query_id"));
133-
assertTrue("has stages", profile.has("stages"));
134-
JSONArray stages = profile.getJSONArray("stages");
141+
assertTrue("has summary", profile.has("summary"));
142+
assertTrue("has phases", profile.has("phases"));
143+
JSONObject plan = profile.getJSONObject("plan");
144+
assertTrue("has query_id", plan.has("query_id"));
145+
assertTrue("has stages", plan.has("stages"));
146+
JSONArray stages = plan.getJSONArray("stages");
135147
assertTrue("at least one stage", stages.length() >= 1);
136148
}
137149

@@ -142,7 +154,7 @@ public void testPplProfileStagesShowSucceeded() throws IOException {
142154
executeWithProfile("source = " + INDEX + " | fields name, score", "/_plugins/_ppl");
143155

144156
JSONObject profile = result.getJSONObject("profile");
145-
JSONArray stages = profile.getJSONArray("stages");
157+
JSONArray stages = profile.getJSONObject("plan").getJSONArray("stages");
146158

147159
for (int i = 0; i < stages.length(); i++) {
148160
JSONObject stage = stages.getJSONObject(i);
@@ -158,7 +170,7 @@ public void testPplProfileTasksHaveNodeAndTiming() throws IOException {
158170
executeWithProfile("source = " + INDEX + " | fields name", "/_plugins/_ppl");
159171

160172
JSONObject profile = result.getJSONObject("profile");
161-
JSONArray stages = profile.getJSONArray("stages");
173+
JSONArray stages = profile.getJSONObject("plan").getJSONArray("stages");
162174

163175
boolean foundTasks = false;
164176
for (int i = 0; i < stages.length(); i++) {

plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java

Lines changed: 47 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@
1010
import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;
1111
import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY;
1212

13+
import com.google.gson.JsonElement;
14+
import com.google.gson.JsonObject;
15+
import com.google.gson.JsonParser;
1316
import java.util.Map;
1417
import java.util.Optional;
1518
import org.apache.calcite.rel.RelNode;
@@ -22,7 +25,10 @@
2225
import org.opensearch.analytics.exec.profile.QueryProfile;
2326
import org.opensearch.cluster.service.ClusterService;
2427
import org.opensearch.common.unit.TimeValue;
28+
import org.opensearch.common.xcontent.XContentFactory;
2529
import org.opensearch.core.action.ActionListener;
30+
import org.opensearch.core.xcontent.ToXContent;
31+
import org.opensearch.core.xcontent.XContentBuilder;
2632
import org.opensearch.index.IndexSettings;
2733
import org.opensearch.indices.IndicesService;
2834
import org.opensearch.sql.api.UnifiedQueryContext;
@@ -38,6 +44,8 @@
3844
import org.opensearch.sql.executor.QueryType;
3945
import org.opensearch.sql.executor.analytics.AnalyticsExecutionEngine;
4046
import org.opensearch.sql.lang.LangSpec;
47+
import org.opensearch.sql.monitor.profile.ProfileContext;
48+
import org.opensearch.sql.monitor.profile.QueryProfiling;
4149
import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse;
4250
import org.opensearch.sql.protocol.response.QueryResult;
4351
import org.opensearch.sql.protocol.response.format.ResponseFormatter;
@@ -185,10 +193,9 @@ private void doExecute(
185193
// Carry the front-end task so cancellation propagates into the engine.
186194
QueryRequestContext queryCtx =
187195
withParentTask(contextProvider.getContext(), parentTask);
188-
// Disable SQL-layer phase profiling when analytics engine profiling is active.
189-
// Our QueryProfile (stages, tasks, timing) is strictly more detailed and replaces
190-
// it.
191-
UnifiedQueryContext context = buildContext(queryType, false, queryCtx);
196+
197+
UnifiedQueryContext context = buildContext(queryType, profiling, queryCtx);
198+
ProfileContext profileCtx = QueryProfiling.current();
192199
ActionListener<TransportPPLQueryResponse> closingListener =
193200
wrapWithContextClose(context, listener);
194201
try {
@@ -205,13 +212,13 @@ private void doExecute(
205212
plan,
206213
planContext,
207214
queryCtx,
208-
createQueryListener(queryType, closingListener));
215+
createQueryListener(queryType, profileCtx, closingListener));
209216
} else {
210217
analyticsEngine.execute(
211218
plan,
212219
planContext,
213220
queryCtx,
214-
createQueryListener(queryType, closingListener));
221+
createQueryListener(queryType, profileCtx, closingListener));
215222
}
216223
} catch (Exception e) {
217224
closingListener.onFailure(e);
@@ -361,19 +368,32 @@ private static RelNode addFetchSizeLimit(
361368
}
362369

363370
private ResponseListener<QueryResponse> createQueryListener(
364-
QueryType queryType, ActionListener<TransportPPLQueryResponse> transportListener) {
371+
QueryType queryType,
372+
ProfileContext profileCtx,
373+
ActionListener<TransportPPLQueryResponse> transportListener) {
365374
ResponseFormatter<QueryResult> formatter = new SimpleJsonResponseFormatter(PRETTY);
366375
return new ResponseListener<QueryResponse>() {
367376
@Override
368377
public void onResponse(QueryResponse response) {
369378
LangSpec langSpec = queryType == QueryType.PPL ? PPL_SPEC : LangSpec.SQL_SPEC;
370-
String result =
371-
formatter.format(
372-
new QueryResult(
373-
response.getSchema(), response.getResults(), response.getCursor(), langSpec));
379+
380+
// Set the engine profile as the plan so the formatter serializes it in one pass.
374381
if (response.getProfile() != null) {
375-
// Append profile and error (if any) to the JSON response
376-
result = appendProfileToJson(result, response.getProfile(), response.getError());
382+
profileCtx.setEnginePlan(toJsonElement(response.getProfile()));
383+
}
384+
385+
String result =
386+
QueryProfiling.withCurrentContext(
387+
profileCtx,
388+
() ->
389+
formatter.format(
390+
new QueryResult(
391+
response.getSchema(),
392+
response.getResults(),
393+
response.getCursor(),
394+
langSpec)));
395+
if (response.getError() != null) {
396+
result = appendError(result, response.getError());
377397
}
378398
transportListener.onResponse(new TransportPPLQueryResponse(result));
379399
}
@@ -385,30 +405,24 @@ public void onFailure(Exception e) {
385405
};
386406
}
387407

388-
private static String appendProfileToJson(String json, QueryProfile profile, Throwable error) {
408+
private static JsonElement toJsonElement(QueryProfile profile) {
389409
try {
390-
StringBuilder extra = new StringBuilder();
391-
// Append profile
392-
org.opensearch.core.xcontent.XContentBuilder builder =
393-
org.opensearch.common.xcontent.XContentFactory.jsonBuilder();
394-
profile.toXContent(builder, org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS);
395-
extra.append(",\"profile\":").append(builder.toString());
396-
// Append error if query partially failed
397-
if (error != null) {
398-
extra
399-
.append(",\"error\":{\"type\":\"")
400-
.append(error.getClass().getSimpleName())
401-
.append("\",\"reason\":\"")
402-
.append(error.getMessage() != null ? error.getMessage().replace("\"", "\\\"") : "")
403-
.append("\"}");
404-
}
405-
if (json.endsWith("}")) {
406-
return json.substring(0, json.length() - 1) + extra + "}";
407-
}
408-
return json;
410+
XContentBuilder builder = XContentFactory.jsonBuilder();
411+
profile.toXContent(builder, ToXContent.EMPTY_PARAMS);
412+
return JsonParser.parseString(builder.toString());
409413
} catch (Exception e) {
414+
return null;
415+
}
416+
}
417+
418+
private static String appendError(String json, Throwable error) {
419+
if (!json.endsWith("}")) {
410420
return json;
411421
}
422+
JsonObject err = new JsonObject();
423+
err.addProperty("type", error.getClass().getSimpleName());
424+
err.addProperty("reason", error.getMessage() != null ? error.getMessage() : "");
425+
return json.substring(0, json.length() - 1) + ",\"error\":" + err + "}";
412426
}
413427

414428
private static Runnable withCurrentContext(final Runnable task) {

0 commit comments

Comments
 (0)