Skip to content

Commit e3b291e

Browse files
respond to PR comments
1 parent 375e851 commit e3b291e

2 files changed

Lines changed: 13 additions & 9 deletions

File tree

temporal-sdk/src/main/java/io/temporal/internal/worker/AdjustableSemaphore.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,11 @@ public final class AdjustableSemaphore {
1414
*/
1515
private int maxPermits = 0;
1616

17-
/** New instances should be configured with setMaxPermits(). */
17+
/**
18+
* Create a new adjustable semaphore with the given number of initial permits.
19+
*
20+
* @param initialPermits the initial number of permits, must be at least 1
21+
*/
1822
public AdjustableSemaphore(int initialPermits) {
1923
if (initialPermits < 1) {
2024
throw new IllegalArgumentException(
@@ -48,8 +52,8 @@ synchronized void setMaxPermits(int newMax) {
4852
this.semaphore.release(delta);
4953
} else {
5054
// delta < 0.
51-
// reducePermits needs a positive #, though.
52-
this.semaphore.reducePermits(delta * -1);
55+
// reducePermits needs a positive #
56+
this.semaphore.reducePermits(Math.abs(delta));
5357
}
5458

5559
this.maxPermits = newMax;

temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncPoller.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -316,13 +316,13 @@ public void run() {
316316
*/
317317
@ThreadSafe
318318
class PollQueueBalancer {
319-
Map<String, AtomicInteger> taskCounts = new HashMap<>();
319+
Map<String, Integer> taskCounts = new HashMap<>();
320320
private final Lock balancerLock = new ReentrantLock();
321321
private final Condition balancerCondition = balancerLock.newCondition();
322322

323323
void startPoll(String pollerName) {
324324
balancerLock.lock();
325-
int currentPolls = taskCounts.get(pollerName).addAndGet(1);
325+
Integer currentPolls = taskCounts.compute(pollerName, (k, v) -> v + 1);
326326
if (currentPolls == 1) {
327327
balancerCondition.signalAll();
328328
}
@@ -335,7 +335,7 @@ void endPoll(String pollerName) {
335335
balancerLock.unlock();
336336
return;
337337
}
338-
int currentPolls = taskCounts.get(pollerName).addAndGet(-1);
338+
Integer currentPolls = taskCounts.compute(pollerName, (k, v) -> v - 1);
339339
if (currentPolls == 0) {
340340
balancerCondition.signalAll();
341341
}
@@ -344,7 +344,7 @@ void endPoll(String pollerName) {
344344

345345
void addPoller(String pollerName) {
346346
balancerLock.lock();
347-
taskCounts.put(pollerName, new AtomicInteger(0));
347+
taskCounts.put(pollerName, 0);
348348
balancerCondition.signalAll();
349349
balancerLock.unlock();
350350
}
@@ -362,13 +362,13 @@ void balance(String p) throws InterruptedException {
362362
balancerLock.lock();
363363
try {
364364
// If this poller has no tasks then we can unblock immediately
365-
if (taskCounts.get(p).get() == 0) {
365+
if (taskCounts.get(p) == 0) {
366366
return;
367367
}
368368
// Check if all tasks have at least one poll request
369369
boolean allOtherTasksHavePolls = true;
370370
for (String task : taskCounts.keySet()) {
371-
if (!Objects.equals(task, p) && taskCounts.get(task).get() == 0) {
371+
if (!Objects.equals(task, p) && taskCounts.get(task) == 0) {
372372
allOtherTasksHavePolls = false;
373373
break;
374374
}

0 commit comments

Comments
 (0)