Skip to content

Commit 68c69df

Browse files
sliceofapplepiesureshanaparti
authored andcommitted
Async event bus publishing in AsyncJobManagerImpl to reduce API thread contention
publishOnEventBus() was calling _messageBus.publish() synchronously on the request thread, which blocks on MessageBusBase$Gate (an exclusive mutex). JFR analysis showed this causing up to 107ms waits on Jetty request threads, contributing to 502 errors from the upstream load balancer. Move event bus publishing to a dedicated single-threaded executor so API request threads are no longer blocked by Gate contention. Event ordering is preserved by the single-threaded executor.
1 parent 4708121 commit 68c69df

File tree

1 file changed

+9
-2
lines changed

1 file changed

+9
-2
lines changed

framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
184184
private volatile long _executionRunNumber = 1;
185185

186186
private final ScheduledExecutorService _heartbeatScheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("AsyncJobMgr-Heartbeat"));
187+
private final ExecutorService _eventBusPublisher = Executors.newSingleThreadExecutor(new NamedThreadFactory("AsyncJobMgr-EventBus"));
187188
private ExecutorService _apiJobExecutor;
188189
private ExecutorService _workerJobExecutor;
189190

@@ -1378,6 +1379,7 @@ public boolean start() {
13781379
@Override
13791380
public boolean stop() {
13801381
_heartbeatScheduler.shutdown();
1382+
_eventBusPublisher.shutdown();
13811383
_apiJobExecutor.shutdown();
13821384
_workerJobExecutor.shutdown();
13831385
return true;
@@ -1397,8 +1399,13 @@ protected AsyncJobManagerImpl() {
13971399
}
13981400

13991401
private void publishOnEventBus(AsyncJob job, String jobEvent) {
1400-
_messageBus.publish(null, AsyncJob.Topics.JOB_EVENT_PUBLISH, PublishScope.LOCAL,
1401-
new Pair<AsyncJob, String>(job, jobEvent));
1402+
try {
1403+
_eventBusPublisher.submit(() ->
1404+
_messageBus.publish(null, AsyncJob.Topics.JOB_EVENT_PUBLISH, PublishScope.LOCAL,
1405+
new Pair<AsyncJob, String>(job, jobEvent)));
1406+
} catch (RejectedExecutionException e) {
1407+
s_logger.warn("Failed to publish async job event, event bus publisher is shut down", e);
1408+
}
14021409
}
14031410

14041411
@Override

0 commit comments

Comments
 (0)