Skip to content

Commit b968d7f

Browse files
committed
fix(analytics): preserve profile context across async listener thread
QueryPlanExecutor (analytics-framework 3.7+) fires its listener on a worker thread separate from the one that activated QueryProfiling. Without explicit handoff, the listener-side formatter reads an empty profile context, and the synchronous UnifiedQueryContext.close() via try-with-resources fires before async work finishes — closing the Calcite connection while it is still in use and clearing profile state before the listener can read it. - AnalyticsExecutionEngine.execute: capture the active ProfileContext and restore it via QueryProfiling.withCurrentContext inside the response callback so metric reads/writes see the same context. - RestUnifiedQueryAction.execute: defer UnifiedQueryContext.close to listener completion via ActionListener.runAfter, wrapped in a named wrapWithContextClose helper. - Add regression test simulating async listener delivery. Refs: #5317 Signed-off-by: Chen Dai <daichen@amazon.com>
1 parent f022dd9 commit b968d7f

3 files changed

Lines changed: 83 additions & 14 deletions

File tree

core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import org.opensearch.sql.executor.ExecutionEngine;
3434
import org.opensearch.sql.executor.pagination.Cursor;
3535
import org.opensearch.sql.monitor.profile.MetricName;
36-
import org.opensearch.sql.monitor.profile.ProfileMetric;
36+
import org.opensearch.sql.monitor.profile.ProfileContext;
3737
import org.opensearch.sql.monitor.profile.QueryProfiling;
3838
import org.opensearch.sql.planner.physical.PhysicalPlan;
3939

@@ -81,7 +81,7 @@ public void execute(
8181
// to a worker pool and results arrive on the listener. Record the execute metric in the
8282
// listener callback, before delegating to the user-supplied listener, so the metric snapshot
8383
// taken by SimpleJsonResponseFormatter sees the correct value.
84-
ProfileMetric execMetric = QueryProfiling.current().getOrCreateMetric(MetricName.EXECUTE);
84+
ProfileContext profileCtx = QueryProfiling.current();
8585
long execStart = System.nanoTime();
8686

8787
planExecutor.execute(
@@ -90,15 +90,22 @@ public void execute(
9090
new ActionListener<>() {
9191
@Override
9292
public void onResponse(Iterable<Object[]> rows) {
93-
try {
94-
List<RelDataTypeField> fields = plan.getRowType().getFieldList();
95-
List<ExprValue> results = convertRows(rows, fields);
96-
Schema schema = buildSchema(fields);
97-
execMetric.set(System.nanoTime() - execStart);
98-
listener.onResponse(new QueryResponse(schema, results, Cursor.None));
99-
} catch (Exception e) {
100-
listener.onFailure(e);
101-
}
93+
QueryProfiling.withCurrentContext(
94+
profileCtx,
95+
() -> {
96+
try {
97+
List<RelDataTypeField> fields = plan.getRowType().getFieldList();
98+
List<ExprValue> results = convertRows(rows, fields);
99+
Schema schema = buildSchema(fields);
100+
profileCtx
101+
.getOrCreateMetric(MetricName.EXECUTE)
102+
.set(System.nanoTime() - execStart);
103+
listener.onResponse(new QueryResponse(schema, results, Cursor.None));
104+
} catch (Exception e) {
105+
listener.onFailure(e);
106+
}
107+
return null;
108+
});
102109
}
103110

104111
@Override

core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import static org.junit.jupiter.api.Assertions.assertEquals;
99
import static org.junit.jupiter.api.Assertions.assertNotNull;
10+
import static org.junit.jupiter.api.Assertions.assertSame;
1011
import static org.junit.jupiter.api.Assertions.assertTrue;
1112
import static org.mockito.ArgumentMatchers.any;
1213
import static org.mockito.ArgumentMatchers.eq;
@@ -37,6 +38,8 @@
3738
import org.opensearch.sql.data.type.ExprCoreType;
3839
import org.opensearch.sql.executor.ExecutionEngine.ExplainResponse;
3940
import org.opensearch.sql.executor.ExecutionEngine.QueryResponse;
41+
import org.opensearch.sql.monitor.profile.ProfileContext;
42+
import org.opensearch.sql.monitor.profile.QueryProfiling;
4043
import org.opensearch.sql.planner.physical.PhysicalPlan;
4144

4245
class AnalyticsExecutionEngineTest {
@@ -330,6 +333,49 @@ void physicalPlanExplain_callsOnFailure() {
330333
+ errorRef.get().getMessage());
331334
}
332335

336+
@Test
337+
@SuppressWarnings("unchecked")
338+
void executeRelNode_profilePreservedOnAsyncListener() throws Exception {
339+
ProfileContext expected = QueryProfiling.activate(true);
340+
341+
RelNode relNode = mockRelNode("col", SqlTypeName.VARCHAR);
342+
Iterable<Object[]> rows = Collections.singletonList(new Object[] {"value"});
343+
344+
// Fire the executor's listener on a different thread to simulate async dispatch
345+
doAnswer(
346+
inv -> {
347+
ActionListener<Iterable<Object[]>> al = inv.getArgument(2);
348+
Thread t = new Thread(() -> al.onResponse(rows));
349+
t.start();
350+
t.join();
351+
return null;
352+
})
353+
.when(mockExecutor)
354+
.execute(eq(relNode), any(), any(ActionListener.class));
355+
356+
AtomicReference<ProfileContext> seen = new AtomicReference<>();
357+
engine.execute(
358+
relNode,
359+
mockContext,
360+
new ResponseListener<>() {
361+
@Override
362+
public void onResponse(QueryResponse response) {
363+
seen.set(QueryProfiling.current());
364+
}
365+
366+
@Override
367+
public void onFailure(Exception e) {
368+
throw new AssertionError(e);
369+
}
370+
});
371+
372+
try {
373+
assertSame(expected, seen.get(), "Profile context not restored on async listener thread");
374+
} finally {
375+
QueryProfiling.clear();
376+
}
377+
}
378+
333379
// --- helpers ---
334380

335381
private QueryResponse executeAndCapture(RelNode relNode) {

plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,15 +118,18 @@ public void execute(
118118
.schedule(
119119
withCurrentContext(
120120
() -> {
121-
try (UnifiedQueryContext context = buildContext(queryType, profiling)) {
121+
UnifiedQueryContext context = buildContext(queryType, profiling);
122+
ActionListener<TransportPPLQueryResponse> closingListener =
123+
wrapWithContextClose(context, listener);
124+
try {
122125
UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context);
123126
RelNode plan = planner.plan(query);
124127
CalcitePlanContext planContext = context.getPlanContext();
125128
plan = addQuerySizeLimit(plan, planContext);
126129
analyticsEngine.execute(
127-
plan, planContext, createQueryListener(queryType, listener));
130+
plan, planContext, createQueryListener(queryType, closingListener));
128131
} catch (Exception e) {
129-
listener.onFailure(e);
132+
closingListener.onFailure(e);
130133
}
131134
}),
132135
new TimeValue(0),
@@ -256,4 +259,17 @@ private static Runnable withCurrentContext(final Runnable task) {
256259
task.run();
257260
};
258261
}
262+
263+
private static ActionListener<TransportPPLQueryResponse> wrapWithContextClose(
264+
UnifiedQueryContext context, ActionListener<TransportPPLQueryResponse> delegate) {
265+
return ActionListener.runAfter(
266+
delegate,
267+
() -> {
268+
try {
269+
context.close();
270+
} catch (Exception e) {
271+
LOG.warn("Failed to close query context", e);
272+
}
273+
});
274+
}
259275
}

0 commit comments

Comments
 (0)