@@ -284,6 +284,7 @@ public void run() {
284284 } catch (PollTaskAsyncAbort ab ) {
285285 abort = true ;
286286 } catch (Throwable e ) {
287+ log .error ("Error in process task loop" , e );
287288 if (e instanceof InterruptedException ) {
288289 // we restore the flag here, so it can be checked and processed (with exit) in finally.
289290 Thread .currentThread ().interrupt ();
@@ -370,24 +371,28 @@ void removePoller(String pollerName) {
370371 void balance (String p ) throws InterruptedException {
371372 while (!shouldTerminate ()) {
372373 balancerLock .lock ();
373- // If this poller has no tasks then we can unblock immediately
374- if (taskCounts .get (p ).get () == 0 ) {
375- balancerLock .unlock ();
376- return ;
377- }
378- // Check if all tasks have at least one poll request
379- boolean allOtherTasksHavePolls = true ;
380- for (String task : taskCounts .keySet ()) {
381- if (!Objects .equals (task , p ) && taskCounts .get (task ).get () == 0 ) {
382- allOtherTasksHavePolls = false ;
383- break ;
374+ try {
375+ // If this poller has no tasks then we can unblock immediately
376+ if (taskCounts .get (p ).get () == 0 ) {
377+ return ;
384378 }
385- }
386- if (!allOtherTasksHavePolls ) {
387- balancerCondition .await ();
388- } else {
379+ // Check if all tasks have at least one poll request
380+ boolean allOtherTasksHavePolls = true ;
381+ for (String task : taskCounts .keySet ()) {
382+ if (!Objects .equals (task , p ) && taskCounts .get (task ).get () == 0 ) {
383+ allOtherTasksHavePolls = false ;
384+ break ;
385+ }
386+ }
387+ if (!allOtherTasksHavePolls ) {
388+ log .info (
389+ "Waiting for other tasks to have at least one poll request before proceeding with task" );
390+ balancerCondition .await ();
391+ } else {
392+ return ;
393+ }
394+ } finally {
389395 balancerLock .unlock ();
390- return ;
391396 }
392397 }
393398 }
0 commit comments