@@ -34,7 +34,7 @@ final class AsyncPoller<T extends ScalingTask> extends BasePoller<T> {
3434 private Throttler pollRateThrottler ;
3535 private final Thread .UncaughtExceptionHandler uncaughtExceptionHandler =
3636 new PollerUncaughtExceptionHandler ();
37- private final PollQueueBalancer balancer =
37+ private final PollQueueBalancer pollerBalancer =
3838 new PollQueueBalancer (); // Used to balance the number of slots across pollers
3939
4040 AsyncPoller (
@@ -111,15 +111,15 @@ public boolean start() {
111111 pollerBehavior .getMaxConcurrentTaskPollers (),
112112 pollerBehavior .getInitialMaxConcurrentTaskPollers (),
113113 (newTarget ) -> {
114- log .info (
114+ log .debug (
115115 "Updating maximum number of pollers for {} to: {}" ,
116116 asyncTaskPoller .getLabel (),
117117 newTarget );
118118 pollerSemaphore .setMaxPermits (newTarget );
119119 });
120120 PollQueueTask pollQueue =
121121 new PollQueueTask (asyncTaskPoller , pollerSemaphore , pollScaleReportHandle );
122- balancer .addPoller (asyncTaskPoller .getLabel ());
122+ pollerBalancer .addPoller (asyncTaskPoller .getLabel ());
123123 exec .execute (pollQueue );
124124 exec .scheduleAtFixedRate (pollScaleReportHandle , 0 , 100 , TimeUnit .MILLISECONDS );
125125 }
@@ -133,6 +133,7 @@ public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean
133133 (f ) -> {
134134 for (PollTaskAsync <T > asyncTaskPoller : asyncTaskPollers ) {
135135 try {
136+ log .debug ("Shutting down async poller: {}" , asyncTaskPoller .getLabel ());
136137 asyncTaskPoller .cancel (new RuntimeException ("Shutting down poller" ));
137138 } catch (Throwable e ) {
138139 log .error ("Error while cancelling poll task" , e );
@@ -186,7 +187,6 @@ class PollQueueTask implements Runnable {
186187
187188 @ Override
188189 public void run () {
189- int slotsReserved = 0 ;
190190 while (!abort ) {
191191 // Permit to reserve a slot for the poll request.
192192 SlotPermit permit = null ;
@@ -217,7 +217,7 @@ public void run() {
217217 return ;
218218 }
219219
220- balancer .balance (asyncTaskPoller .getLabel ());
220+ pollerBalancer .balance (asyncTaskPoller .getLabel ());
221221 if (shouldTerminate ()) {
222222 return ;
223223 }
@@ -231,12 +231,8 @@ public void run() {
231231 }
232232 permit = BasePoller .getSlotPermitAndHandleInterrupts (future , slotSupplier );
233233 if (permit == null || shouldTerminate ()) {
234- if (permit != null ) {
235- slotsReserved ++;
236- }
237234 return ;
238235 }
239- slotsReserved ++;
240236
241237 pollerSemaphore .acquire ();
242238 pollerSemaphoreAcquired = true ;
@@ -249,12 +245,12 @@ public void run() {
249245 CompletableFuture <T > pollRequest = asyncTaskPoller .poll (permit );
250246 // Mark that we have made a poll request
251247 pollRequestMade = true ;
252- balancer .startPoll (asyncTaskPoller .getLabel ());
248+ pollerBalancer .startPoll (asyncTaskPoller .getLabel ());
253249
254250 pollRequest
255251 .handle (
256252 (task , e ) -> {
257- balancer .endPoll (asyncTaskPoller .getLabel ());
253+ pollerBalancer .endPoll (asyncTaskPoller .getLabel ());
258254 if (e instanceof CompletionException ) {
259255 e = e .getCause ();
260256 }
@@ -302,18 +298,22 @@ public void run() {
302298 }
303299
304300 if (shouldTerminate ()) {
305- balancer .removePoller (asyncTaskPoller .getLabel ());
301+ pollerBalancer .removePoller (asyncTaskPoller .getLabel ());
306302 abort = true ;
307303 }
308304 }
309305 }
310306 log .info (
311- "poll loop is terminated: {} - {}" ,
307+ "Poll loop is terminated: {} - {}" ,
312308 AsyncPoller .this .getClass ().getSimpleName (),
313309 asyncTaskPoller .getLabel ());
314310 }
315311 }
316312
313+ /**
314+ * PollQueueBalancer is used to ensure that at least one poller is running for each task. This is
315+ * necessary to avoid one poller from consuming all the slots and starving other pollers.
316+ */
317317 @ ThreadSafe
318318 class PollQueueBalancer {
319319 Map <String , AtomicInteger > taskCounts = new HashMap <>();
@@ -374,8 +374,6 @@ void balance(String p) throws InterruptedException {
374374 }
375375 }
376376 if (!allOtherTasksHavePolls ) {
377- log .info (
378- "Waiting for other tasks to have at least one poll request before proceeding with task" );
379377 balancerCondition .await ();
380378 } else {
381379 return ;
0 commit comments