1515import java .util .concurrent .locks .Condition ;
1616import java .util .concurrent .locks .Lock ;
1717import java .util .concurrent .locks .ReentrantLock ;
18+ import javax .annotation .concurrent .ThreadSafe ;
1819import org .slf4j .Logger ;
1920import org .slf4j .LoggerFactory ;
2021
@@ -34,7 +35,7 @@ final class AsyncPoller<T extends ScalingTask> extends BasePoller<T> {
3435 private final Thread .UncaughtExceptionHandler uncaughtExceptionHandler =
3536 new PollerUncaughtExceptionHandler ();
3637 private final PollQueueBalancer balancer =
37- new PollQueueBalancer (); // Used to balance the number of pollers across threads
38+ new PollQueueBalancer (); // Used to balance the number of slots across pollers
3839
3940 AsyncPoller (
4041 TrackingSlotSupplier <?> slotSupplier ,
@@ -129,7 +130,6 @@ public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean
129130 .thenApply (
130131 (f ) -> {
131132 for (PollTaskAsync <T > asyncTaskPoller : asyncTaskPollers ) {
132- log .info ("Shutting down async poller: {} !!!!" , asyncTaskPoller );
133133 try {
134134 asyncTaskPoller .cancel (new RuntimeException ("Shutting down poller" ));
135135 } catch (Throwable e ) {
@@ -259,15 +259,14 @@ public void run() {
259259 pollerSemaphore .release ();
260260 pollScaleReportHandle .report (task , e );
261261 if (e != null ) {
262- // uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), e);
262+ uncaughtExceptionHandler .uncaughtException (Thread .currentThread (), e );
263263 pollBackoffThrottler .failure (
264264 (e instanceof StatusRuntimeException )
265265 ? ((StatusRuntimeException ) e ).getStatus ().getCode ()
266266 : Status .Code .UNKNOWN );
267267 slotSupplier .releaseSlot (SlotReleaseReason .neverUsed (), finalPermit );
268268 return null ;
269269 }
270- log .trace ("$$$$$ Picked up task: {}" , task );
271270 if (task != null ) {
272271 taskExecutor .process (task );
273272 } else {
@@ -284,7 +283,6 @@ public void run() {
284283 } catch (PollTaskAsyncAbort ab ) {
285284 abort = true ;
286285 } catch (Throwable e ) {
287- log .error ("Error in process task loop" , e );
288286 if (e instanceof InterruptedException ) {
289287 // we restore the flag here, so it can be checked and processed (with exit) in finally.
290288 Thread .currentThread ().interrupt ();
@@ -302,23 +300,19 @@ public void run() {
302300 }
303301
304302 if (shouldTerminate ()) {
305- log .info (
306- "ABORT {} permit {}, pollRequestMade {}" ,
307- asyncTaskPoller .getLabel (),
308- permit ,
309- pollRequestMade );
310303 balancer .removePoller (asyncTaskPoller .getLabel ());
311304 abort = true ;
312305 }
313306 }
314307 }
315308 log .info (
316- "poll loop is terminated: {}, slotsReserved {}" ,
317- asyncTaskPoller . getLabel (),
318- slotsReserved );
309+ "poll loop is terminated: {} - {}" ,
310+ AsyncPoller . this . getClass (). getSimpleName (),
311+ asyncTaskPoller . getLabel () );
319312 }
320313 }
321314
315+ @ ThreadSafe
322316 class PollQueueBalancer {
323317 Map <String , AtomicInteger > taskCounts = new HashMap <>();
324318 private final Lock balancerLock = new ReentrantLock ();
@@ -336,7 +330,6 @@ void startPoll(String pollerName) {
336330 void endPoll (String pollerName ) {
337331 balancerLock .lock ();
338332 if (!taskCounts .containsKey (pollerName )) {
339- log .info ("endPoll has null key for tasks {}" , pollerName );
340333 balancerLock .unlock ();
341334 return ;
342335 }
@@ -349,13 +342,7 @@ void endPoll(String pollerName) {
349342
350343 void addPoller (String pollerName ) {
351344 balancerLock .lock ();
352- log .info ("addPoller {}" , pollerName );
353345 taskCounts .put (pollerName , new AtomicInteger (0 ));
354- if (!taskCounts .containsKey (pollerName )) {
355- log .info ("????? has null key for task {}" , pollerName );
356- balancerLock .unlock ();
357- return ;
358- }
359346 balancerCondition .signalAll ();
360347 balancerLock .unlock ();
361348 }
0 commit comments