@@ -116,7 +116,7 @@ public boolean start() {
116116 });
117117 PollQueueTask pollQueue =
118118 new PollQueueTask (asyncTaskPoller , pollerSemaphore , pollScaleReportHandle );
119- balancer .addPoller (pollQueue );
119+ balancer .addPoller (asyncTaskPoller . getLabel () );
120120 exec .execute (pollQueue );
121121 exec .scheduleAtFixedRate (pollScaleReportHandle , 0 , 100 , TimeUnit .MILLISECONDS );
122122 }
@@ -129,6 +129,7 @@ public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean
129129 .thenApply (
130130 (f ) -> {
131131 for (PollTaskAsync <T > asyncTaskPoller : asyncTaskPollers ) {
132+ log .info ("Shutting down async poller: {} !!!!" , asyncTaskPoller );
132133 try {
133134 asyncTaskPoller .cancel (new RuntimeException ("Shutting down poller" ));
134135 } catch (Throwable e ) {
@@ -152,6 +153,10 @@ public interface PollTaskAsync<TT> {
152153 default void cancel (Throwable cause ) {
153154 // no-op
154155 }
156+
157+ default String getLabel () {
158+ return "AsyncPollTask" ;
159+ }
155160 }
156161
157162 class PollQueueTask implements Runnable {
@@ -179,171 +184,211 @@ class PollQueueTask implements Runnable {
179184
180185 @ Override
181186 public void run () {
182- // Permit to reserve a slot for the poll request.
183- SlotPermit permit = null ;
184- // Flag to check if pollerSemaphore was acquired, if so, we need to release it if
185- // an exception occurs.
186- boolean pollerSemaphoreAcquired = false ;
187- // Flag to check if poll request was made, if not, we need to release the slot
188- // permit and pollerSemaphore in this method.
189- boolean pollRequestMade = false ;
190- try {
191- long throttleMs = pollBackoffThrottler .getSleepTime ();
192- if (throttleMs > 0 ) {
193- Thread .sleep (throttleMs );
194- }
195- if (pollRateThrottler != null ) {
196- pollRateThrottler .throttle ();
197- }
187+ int slotsReserved = 0 ;
188+ while (!abort ) {
189+ // Permit to reserve a slot for the poll request.
190+ SlotPermit permit = null ;
191+ // Flag to check if pollerSemaphore was acquired, if so, we need to release it if
192+ // an exception occurs.
193+ boolean pollerSemaphoreAcquired = false ;
194+ // Flag to check if poll request was made, if not, we need to release the slot
195+ // permit and pollerSemaphore in this method.
196+ boolean pollRequestMade = false ;
197+ try {
198+ long throttleMs = pollBackoffThrottler .getSleepTime ();
199+ if (throttleMs > 0 ) {
200+ Thread .sleep (throttleMs );
201+ }
202+ if (pollRateThrottler != null ) {
203+ pollRateThrottler .throttle ();
204+ }
198205
199- CountDownLatch suspender = suspendLatch .get ();
200- if (suspender != null ) {
201- if (log .isDebugEnabled ()) {
202- log .debug ("poll task suspending latchCount=" + suspender .getCount ());
206+ CountDownLatch suspender = suspendLatch .get ();
207+ if (suspender != null ) {
208+ if (log .isDebugEnabled ()) {
209+ log .debug ("poll task suspending latchCount=" + suspender .getCount ());
210+ }
211+ suspender .await ();
203212 }
204- suspender .await ();
205- }
206213
207- if (shouldTerminate ()) {
208- return ;
209- }
214+ if (shouldTerminate ()) {
215+ return ;
216+ }
210217
211- balancer .balance (this );
212- if (shouldTerminate ()) {
213- return ;
214- }
215- // Reserve a slot for the poll request
216- SlotSupplierFuture future ;
217- try {
218- future = slotSupplier .reserveSlot (slotReservationData );
219- } catch (Exception e ) {
220- log .warn ("Error while trying to reserve a slot" , e .getCause ());
221- return ;
222- }
223- permit = BasePoller .getSlotPermitAndHandleInterrupts (future , slotSupplier );
224- if (permit == null || shouldTerminate ()) {
225- return ;
226- }
218+ balancer .balance (asyncTaskPoller .getLabel ());
219+ if (shouldTerminate ()) {
220+ return ;
221+ }
222+ // Reserve a slot for the poll request
223+ SlotSupplierFuture future ;
224+ try {
225+ future = slotSupplier .reserveSlot (slotReservationData );
226+ } catch (Exception e ) {
227+ log .warn ("Error while trying to reserve a slot" , e .getCause ());
228+ return ;
229+ }
230+ permit = BasePoller .getSlotPermitAndHandleInterrupts (future , slotSupplier );
231+ if (permit == null || shouldTerminate ()) {
232+ if (permit != null ) {
233+ slotsReserved ++;
234+ }
235+ return ;
236+ }
237+ slotsReserved ++;
227238
228- pollerSemaphore .acquire ();
229- pollerSemaphoreAcquired = true ;
230- if (shouldTerminate ()) {
231- return ;
232- }
233- workerMetricsScope .counter (MetricsType .POLLER_START_COUNTER ).inc (1 );
239+ pollerSemaphore .acquire ();
240+ pollerSemaphoreAcquired = true ;
241+ if (shouldTerminate ()) {
242+ return ;
243+ }
244+ workerMetricsScope .counter (MetricsType .POLLER_START_COUNTER ).inc (1 );
234245
235- SlotPermit finalPermit = permit ;
236- CompletableFuture <T > pollRequest = asyncTaskPoller .poll (permit );
237- // Mark that we have made a poll request
238- pollRequestMade = true ;
239- balancer .startPoll (this );
246+ SlotPermit finalPermit = permit ;
247+ CompletableFuture <T > pollRequest = asyncTaskPoller .poll (permit );
248+ // Mark that we have made a poll request
249+ pollRequestMade = true ;
250+ balancer .startPoll (asyncTaskPoller . getLabel () );
240251
241- pollRequest .handle (
242- (task , e ) -> {
243- balancer .endPoll (this );
244- if (e instanceof CompletionException ) {
245- e = e .getCause ();
246- }
252+ pollRequest
253+ .handle (
254+ (task , e ) -> {
255+ balancer .endPoll (asyncTaskPoller .getLabel ());
256+ if (e instanceof CompletionException ) {
257+ e = e .getCause ();
258+ }
259+ pollerSemaphore .release ();
260+ pollScaleReportHandle .report (task , e );
261+ if (e != null ) {
262+ // uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), e);
263+ pollBackoffThrottler .failure (
264+ (e instanceof StatusRuntimeException )
265+ ? ((StatusRuntimeException ) e ).getStatus ().getCode ()
266+ : Status .Code .UNKNOWN );
267+ slotSupplier .releaseSlot (SlotReleaseReason .neverUsed (), finalPermit );
268+ return null ;
269+ }
270+ log .trace ("$$$$$ Picked up task: {}" , task );
271+ if (task != null ) {
272+ taskExecutor .process (task );
273+ } else {
274+ slotSupplier .releaseSlot (SlotReleaseReason .neverUsed (), finalPermit );
275+ }
276+ pollBackoffThrottler .success ();
277+ return null ;
278+ })
279+ .exceptionally (
280+ throwable -> {
281+ log .error ("Error while trying to poll task" , throwable );
282+ return null ;
283+ });
284+ } catch (PollTaskAsyncAbort ab ) {
285+ abort = true ;
286+ } catch (Throwable e ) {
287+ if (e instanceof InterruptedException ) {
288+ // we restore the flag here, so it can be checked and processed (with exit) in finally.
289+ Thread .currentThread ().interrupt ();
290+ }
291+ uncaughtExceptionHandler .uncaughtException (Thread .currentThread (), e );
292+ } finally {
293+ // release the slot if it was acquired, but a poll request was not made
294+ if (!pollRequestMade ) {
295+ if (permit != null ) {
296+ slotSupplier .releaseSlot (SlotReleaseReason .neverUsed (), permit );
297+ }
298+ if (pollerSemaphoreAcquired ) {
247299 pollerSemaphore .release ();
248- pollScaleReportHandle .report (task , e );
249- if (e != null ) {
250- uncaughtExceptionHandler .uncaughtException (Thread .currentThread (), e );
251- pollBackoffThrottler .failure (
252- (e instanceof StatusRuntimeException )
253- ? ((StatusRuntimeException ) e ).getStatus ().getCode ()
254- : Status .Code .UNKNOWN );
255- slotSupplier .releaseSlot (SlotReleaseReason .neverUsed (), finalPermit );
256- return null ;
257- }
258- log .trace ("$$$$$ Picked up task: {}" , task );
259- if (task != null ) {
260- taskExecutor .process (task );
261- } else {
262- slotSupplier .releaseSlot (SlotReleaseReason .neverUsed (), finalPermit );
263- }
264- pollBackoffThrottler .success ();
265- return null ;
266- });
267- } catch (PollTaskAsyncAbort ab ) {
268- abort = true ;
269- } catch (Throwable e ) {
270- if (e instanceof InterruptedException ) {
271- // we restore the flag here, so it can be checked and processed (with exit) in finally.
272- Thread .currentThread ().interrupt ();
273- }
274- uncaughtExceptionHandler .uncaughtException (Thread .currentThread (), e );
275- } finally {
276- // release the slot if it was acquired, but a poll request was not made
277- if (!pollRequestMade && permit != null ) {
278- slotSupplier .releaseSlot (SlotReleaseReason .taskComplete (), permit );
279- }
280- if (!pollRequestMade && pollerSemaphoreAcquired ) {
281- pollerSemaphore .release ();
282- }
300+ }
301+ }
283302
284- if (!shouldTerminate () && !abort ) {
285- // Resubmit itself back to pollExecutor
286- pollExecutor .execute (this );
287- } else {
288- balancer .removePoller (this );
289- log .info ("poll loop is terminated: {}" , asyncTaskPoller .getClass ().getSimpleName ());
303+ if (shouldTerminate ()) {
304+ log .info (
305+ "ABORT {} permit {}, pollRequestMade {}" ,
306+ asyncTaskPoller .getLabel (),
307+ permit ,
308+ pollRequestMade );
309+ balancer .removePoller (asyncTaskPoller .getLabel ());
310+ abort = true ;
311+ }
290312 }
291313 }
314+ log .info (
315+ "poll loop is terminated: {}, slotsReserved {}" ,
316+ asyncTaskPoller .getLabel (),
317+ slotsReserved );
292318 }
293319 }
294320
295321 class PollQueueBalancer {
296- Map <PollQueueTask , AtomicInteger > taskCounts = new HashMap <>();
322+ Map <String , AtomicInteger > taskCounts = new HashMap <>();
297323 private final Lock balancerLock = new ReentrantLock ();
298324 private final Condition balancerCondition = balancerLock .newCondition ();
299325
300- void startPoll (PollQueueTask task ) {
326+ void startPoll (String pollerName ) {
301327 balancerLock .lock ();
302- int currentPolls = taskCounts .get (task ).addAndGet (1 );
328+ int currentPolls = taskCounts .get (pollerName ).addAndGet (1 );
303329 if (currentPolls == 1 ) {
304330 balancerCondition .signalAll ();
305331 }
306332 balancerLock .unlock ();
307333 }
308334
309- void endPoll (PollQueueTask task ) {
335+ void endPoll (String pollerName ) {
310336 balancerLock .lock ();
311- int currentPolls = taskCounts .get (task ).addAndGet (-1 );
337+ if (!taskCounts .containsKey (pollerName )) {
338+ log .info ("endPoll has null key for tasks {}" , pollerName );
339+ balancerLock .unlock ();
340+ return ;
341+ }
342+ int currentPolls = taskCounts .get (pollerName ).addAndGet (-1 );
312343 if (currentPolls == 0 ) {
313344 balancerCondition .signalAll ();
314345 }
315346 balancerLock .unlock ();
316347 }
317348
318- void addPoller (PollQueueTask task ) {
349+ void addPoller (String pollerName ) {
319350 balancerLock .lock ();
320- taskCounts .putIfAbsent (task , new AtomicInteger (0 ));
351+ log .info ("addPoller {}" , pollerName );
352+ taskCounts .put (pollerName , new AtomicInteger (0 ));
353+ if (!taskCounts .containsKey (pollerName )) {
354+ log .info ("????? has null key for task {}" , pollerName );
355+ balancerLock .unlock ();
356+ return ;
357+ }
321358 balancerCondition .signalAll ();
322359 balancerLock .unlock ();
323360 }
324361
325- void removePoller (PollQueueTask task ) {
362+ void removePoller (String pollerName ) {
326363 balancerLock .lock ();
327- taskCounts .remove (task );
364+ taskCounts .remove (pollerName );
328365 balancerCondition .signalAll ();
329366 balancerLock .unlock ();
330367 }
331368
332369 /** Ensure that at least one poller is running for each task. */
333- void balance (PollQueueTask t ) throws InterruptedException {
370+ void balance (String p ) throws InterruptedException {
334371 while (!shouldTerminate ()) {
335372 balancerLock .lock ();
336373 // If this poller has no tasks then we can unblock immediately
337- if (taskCounts .get (t ).get () == 0 ) {
374+ if (taskCounts .get (p ).get () == 0 ) {
338375 balancerLock .unlock ();
339376 return ;
340377 }
341378 // Check if all tasks have at least one poll request
342- for (PollQueueTask task : taskCounts .keySet ()) {
343- if (taskCounts .get (task ).get () == 0 ) {
344- balancerCondition .await ();
379+ boolean allOtherTasksHavePolls = true ;
380+ for (String task : taskCounts .keySet ()) {
381+ if (!Objects .equals (task , p ) && taskCounts .get (task ).get () == 0 ) {
382+ allOtherTasksHavePolls = false ;
383+ break ;
345384 }
346385 }
386+ if (!allOtherTasksHavePolls ) {
387+ balancerCondition .await ();
388+ } else {
389+ balancerLock .unlock ();
390+ return ;
391+ }
347392 }
348393 }
349394 }
0 commit comments