Skip to content

Commit 3ab722f

Browse files
authored
Merge branch 'main' into unified-allocator-followup
2 parents a63141a + 3ddd784 commit 3ab722f

4 files changed

Lines changed: 45 additions & 8 deletions

File tree

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/stage/coordinator/LocalTaskRunner.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,16 @@
99
package org.opensearch.analytics.exec.stage.coordinator;
1010

1111
import org.opensearch.analytics.exec.task.TaskRunner;
12+
import org.opensearch.common.util.concurrent.AbstractRunnable;
1213
import org.opensearch.core.action.ActionListener;
1314
import org.opensearch.core.action.NotifyOnceListener;
1415

1516
import java.util.concurrent.Executor;
1617

1718
/**
18-
* LOCAL-kind task runner: submits the {@link LocalStageTask#body()} to a per-query
19-
* virtual-thread executor. The body owns the listener — wrapped in a
19+
* LOCAL-kind task runner: submits the {@link LocalStageTask} to the search
20+
* executor. The body owns the listener — wrapped in a
2021
* {@link NotifyOnceListener} so a body that both fires and throws can't double-notify.
21-
* Virtual threads keep blocking reduce drains off SEARCH workers.
2222
*
2323
* @opensearch.internal
2424
*/
@@ -43,10 +43,21 @@ protected void innerOnFailure(Exception cause) {
4343
listener.onFailure(cause);
4444
}
4545
};
46-
executor.execute(() -> {
47-
try {
46+
executor.execute(new AbstractRunnable() {
47+
@Override
48+
protected void doRun() {
4849
task.body().accept(once);
49-
} catch (Exception e) {
50+
}
51+
52+
@Override
53+
public void onFailure(Exception e) {
54+
once.onFailure(e);
55+
}
56+
57+
@Override
58+
public void onRejection(Exception e) {
59+
// Bounded pool queue full — fail the query gracefully instead of letting the
60+
// rejection escape run() and hang the query with no terminal callback.
5061
once.onFailure(e);
5162
}
5263
});

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/stage/coordinator/PassThroughStageExecution.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public PassThroughStageExecution(Stage stage, QueryContext config, ExchangeSink
3737
throw new IllegalArgumentException("PassThroughStageExecution requires a RowProducingSink");
3838
}
3939
this.ownedSink = (RowProducingSink) sink;
40-
this.runner = new LocalTaskRunner(config.localTaskExecutor());
40+
this.runner = new LocalTaskRunner(config.searchExecutor());
4141
}
4242

4343
@Override

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/stage/coordinator/ReduceStageExecution.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public ReduceStageExecution(Stage stage, QueryContext config, ReducingExchangeSi
3838
super(stage, config.queryId(), config.operationListeners(), config.parentTask());
3939
this.backendSink = backendSink;
4040
this.downstream = downstream;
41-
this.runner = new LocalTaskRunner(config.localTaskExecutor());
41+
this.runner = new LocalTaskRunner(config.searchExecutor());
4242
}
4343

4444
@Override

sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/exec/stage/LocalTaskRunnerTests.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,12 @@
1010

1111
import org.opensearch.analytics.exec.stage.coordinator.LocalStageTask;
1212
import org.opensearch.analytics.exec.stage.coordinator.LocalTaskRunner;
13+
import org.opensearch.common.util.concurrent.AbstractRunnable;
1314
import org.opensearch.core.action.ActionListener;
15+
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
1416
import org.opensearch.test.OpenSearchTestCase;
1517

18+
import java.util.concurrent.Executor;
1619
import java.util.concurrent.atomic.AtomicBoolean;
1720
import java.util.concurrent.atomic.AtomicReference;
1821

@@ -74,6 +77,29 @@ public void testDispatchSubmitsToExecutor() {
7477
assertTrue("onCompleted fires once the executor runs the body", completed.get());
7578
}
7679

80+
public void testRejectionSignalsFailedGracefully() {
81+
// A bounded pool rejects when its queue is full. OpenSearchThreadPoolExecutor invokes
82+
// onRejection on an AbstractRunnable rather than rethrowing — the runner must route that
83+
// to the listener so the query fails gracefully instead of hanging with no callback.
84+
OpenSearchRejectedExecutionException rejected = new OpenSearchRejectedExecutionException("queue full");
85+
AtomicBoolean ran = new AtomicBoolean();
86+
AtomicBoolean completed = new AtomicBoolean();
87+
AtomicReference<Exception> failure = new AtomicReference<>();
88+
89+
Executor rejectingExecutor = r -> ((AbstractRunnable) r).onRejection(rejected);
90+
LocalTaskRunner dispatcher = new LocalTaskRunner(rejectingExecutor);
91+
LocalStageTask task = new LocalStageTask(new StageTaskId(0, 0), listener -> {
92+
ran.set(true);
93+
listener.onResponse(null);
94+
});
95+
96+
dispatcher.run(task, handle(completed, failure));
97+
98+
assertFalse("body must not run when the task is rejected", ran.get());
99+
assertFalse("onCompleted must not fire when rejected", completed.get());
100+
assertSame("rejection is routed to listener.onFailure", rejected, failure.get());
101+
}
102+
77103
private static ActionListener<Void> handle(AtomicBoolean completed, AtomicReference<Exception> failure) {
78104
return new ActionListener<Void>() {
79105
@Override

0 commit comments

Comments
 (0)