Skip to content

Commit d92092b

Browse files
Add more validation
1 parent e3e609a commit d92092b

6 files changed

Lines changed: 142 additions & 42 deletions

File tree

temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncPoller.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import io.temporal.worker.tuning.SlotSupplierFuture;
1212
import java.util.Collections;
1313
import java.util.List;
14+
import java.util.Objects;
1415
import java.util.concurrent.*;
1516
import org.slf4j.Logger;
1617
import org.slf4j.LoggerFactory;
@@ -28,6 +29,8 @@ final class AsyncPoller<T extends ScalingTask> extends BasePoller<T> {
2829
private final PollerBehaviorAutoscaling pollerBehavior;
2930
private final Scope workerMetricsScope;
3031
private Throttler pollRateThrottler;
32+
private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler =
33+
new PollerUncaughtExceptionHandler();
3134

3235
AsyncPoller(
3336
TrackingSlotSupplier<?> slotSupplier,
@@ -53,18 +56,26 @@ final class AsyncPoller<T extends ScalingTask> extends BasePoller<T> {
5356
PollerOptions pollerOptions,
5457
Scope workerMetricsScope) {
5558
super(pollTaskExecutor);
59+
Objects.requireNonNull(slotSupplier, "slot supplier cannot be null");
60+
Objects.requireNonNull(slotReservationData, "slot reservation data should not be null");
61+
Objects.requireNonNull(asyncTaskPollers, "async tasK pollers should not be null");
62+
if (asyncTaskPollers.isEmpty()) {
63+
throw new IllegalArgumentException("async task pollers must contain at least one poller");
64+
}
65+
Objects.requireNonNull(pollerOptions, "pollerOptions should not be null");
66+
Objects.requireNonNull(workerMetricsScope, "workerMetricsScope should not be null");
5667
this.slotSupplier = slotSupplier;
5768
this.slotReservationData = slotReservationData;
5869
this.asyncTaskPollers = asyncTaskPollers;
59-
this.pollerOptions = pollerOptions;
60-
this.workerMetricsScope = workerMetricsScope;
6170
if (!(pollerOptions.getPollerBehavior() instanceof PollerBehaviorAutoscaling)) {
6271
throw new IllegalArgumentException(
6372
"PollerBehavior "
6473
+ pollerOptions.getPollerBehavior()
6574
+ " is not supported. Only PollerBehaviorSimpleMaximum is supported.");
6675
}
67-
pollerBehavior = (PollerBehaviorAutoscaling) pollerOptions.getPollerBehavior();
76+
this.pollerBehavior = (PollerBehaviorAutoscaling) pollerOptions.getPollerBehavior();
77+
this.pollerOptions = pollerOptions;
78+
this.workerMetricsScope = workerMetricsScope;
6879
}
6980

7081
@Override
@@ -77,8 +88,14 @@ public boolean start() {
7788
pollerOptions.getMaximumPollRatePerSecond(),
7889
pollerOptions.getMaximumPollRateIntervalMilliseconds());
7990
}
80-
// TODO Set thread factory
81-
ScheduledExecutorService exec = Executors.newScheduledThreadPool(asyncTaskPollers.size() + 1);
91+
// Each poller will have its own thread and one thread will be used to schedule the scale
92+
// reporters
93+
ScheduledExecutorService exec =
94+
Executors.newScheduledThreadPool(
95+
asyncTaskPollers.size() + 1,
96+
new ExecutorThreadFactory(
97+
pollerOptions.getPollThreadNamePrefix(),
98+
pollerOptions.getUncaughtExceptionHandler()));
8299
for (PollTaskAsync<T> asyncTaskPoller : asyncTaskPollers) {
83100
AdjustableSemaphore pollerSemaphore = new AdjustableSemaphore();
84101
pollerSemaphore.setMaxPermits(pollerBehavior.getInitialMaxConcurrentTaskPollers());
@@ -190,7 +207,11 @@ public void run() {
190207
return null;
191208
});
192209
} catch (Throwable e) {
193-
210+
if (e instanceof InterruptedException) {
211+
// we restore the flag here, so it can be checked and processed (with exit) in finally.
212+
Thread.currentThread().interrupt();
213+
}
214+
uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), e);
194215
} finally {
195216
if (!shouldTerminate()) {
196217
// Resubmit itself back to pollExecutor

temporal-sdk/src/main/java/io/temporal/internal/worker/BasePoller.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.temporal.internal.worker;
22

3+
import io.grpc.Status;
34
import io.grpc.StatusRuntimeException;
45
import io.temporal.internal.common.GrpcUtils;
56
import io.temporal.worker.tuning.SlotPermit;
@@ -166,4 +167,39 @@ static boolean shouldIgnoreDuringShutdown(Throwable ex) {
166167
// javadoc.
167168
|| ex.getCause() instanceof InterruptedException;
168169
}
170+
171+
protected final class PollerUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
172+
173+
@Override
174+
public void uncaughtException(Thread t, Throwable e) {
175+
if (!pollExecutor.isShutdown() || !shouldIgnoreDuringShutdown(e)) {
176+
logPollErrors(t, e);
177+
} else {
178+
logPollExceptionsSuppressedDuringShutdown(t, e);
179+
}
180+
}
181+
182+
private void logPollErrors(Thread t, Throwable e) {
183+
if (e instanceof StatusRuntimeException) {
184+
StatusRuntimeException te = (StatusRuntimeException) e;
185+
if (te.getStatus().getCode() == Status.Code.DEADLINE_EXCEEDED) {
186+
log.info("DEADLINE_EXCEEDED in poller thread {}", t.getName(), e);
187+
return;
188+
}
189+
}
190+
log.warn("Failure in poller thread {}", t.getName(), e);
191+
}
192+
193+
/**
194+
* Some exceptions are considered normal during shutdown {@link #shouldIgnoreDuringShutdown} and
195+
* we log them in the most quiet manner.
196+
*
197+
* @param t thread where the exception happened
198+
* @param e the exception itself
199+
*/
200+
private void logPollExceptionsSuppressedDuringShutdown(Thread t, Throwable e) {
201+
log.trace(
202+
"Failure in thread {} is suppressed, considered normal during shutdown", t.getName(), e);
203+
}
204+
}
169205
}

temporal-sdk/src/main/java/io/temporal/internal/worker/MultiThreadedPoller.java

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -210,39 +210,4 @@ public void run() throws Exception {
210210
}
211211
}
212212
}
213-
214-
private final class PollerUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
215-
216-
@Override
217-
public void uncaughtException(Thread t, Throwable e) {
218-
if (!pollExecutor.isShutdown() || !shouldIgnoreDuringShutdown(e)) {
219-
logPollErrors(t, e);
220-
} else {
221-
logPollExceptionsSuppressedDuringShutdown(t, e);
222-
}
223-
}
224-
225-
private void logPollErrors(Thread t, Throwable e) {
226-
if (e instanceof StatusRuntimeException) {
227-
StatusRuntimeException te = (StatusRuntimeException) e;
228-
if (te.getStatus().getCode() == Status.Code.DEADLINE_EXCEEDED) {
229-
log.info("DEADLINE_EXCEEDED in poller thread {}", t.getName(), e);
230-
return;
231-
}
232-
}
233-
log.warn("Failure in poller thread {}", t.getName(), e);
234-
}
235-
236-
/**
237-
* Some exceptions are considered normal during shutdown {@link #shouldIgnoreDuringShutdown} and
238-
* we log them in the most quiet manner.
239-
*
240-
* @param t thread where the exception happened
241-
* @param e the exception itself
242-
*/
243-
private void logPollExceptionsSuppressedDuringShutdown(Thread t, Throwable e) {
244-
log.trace(
245-
"Failure in thread {} is suppressed, considered normal during shutdown", t.getName(), e);
246-
}
247-
}
248213
}

temporal-sdk/src/main/java/io/temporal/internal/worker/PollScaleReportHandle.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@
77
import org.slf4j.Logger;
88
import org.slf4j.LoggerFactory;
99

10+
/**
11+
* PollScaleReportHandle is responsible for managing the scaling of pollers based on the scaling
12+
* feedback attached to the task by the server.
13+
*/
1014
@ThreadSafe
1115
public class PollScaleReportHandle<T extends ScalingTask> implements Runnable {
1216
private static final Logger logger = LoggerFactory.getLogger(PollScaleReportHandle.class);

temporal-sdk/src/main/java/io/temporal/worker/WorkerOptions.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -493,37 +493,52 @@ public Builder setDeploymentOptions(WorkerDeploymentOptions deploymentOptions) {
493493
return this;
494494
}
495495

496-
/** Set the poller behavior for workflow task pollers. */
496+
/**
497+
* Set the poller behavior for workflow task pollers.
498+
*
499+
* <p>If the sticky queue is enabled, the poller behavior will be used for the sticky queue as
500+
* well.
501+
*/
497502
@Experimental
498503
public Builder setWorkflowTaskPollersBehaviour(PollerBehaviorAutoscaling pollerBehavior) {
499504
this.workflowTaskPollersBehaviour = pollerBehavior;
500505
return this;
501506
}
502507

508+
/**
509+
* Set the poller behavior for workflow task pollers.
510+
*
511+
* <p>If the sticky queue is enabled, the poller behavior will be used for the sticky queue as
512+
* well.
513+
*/
503514
@Experimental
504515
public Builder setWorkflowTaskPollersBehaviour(PollerBehaviorSimpleMaximum pollerBehavior) {
505516
this.workflowTaskPollersBehaviour = pollerBehavior;
506517
return this;
507518
}
508519

520+
/** Set the poller behavior for activity task pollers. */
509521
@Experimental
510522
public Builder setActivityTaskPollersBehaviour(PollerBehaviorAutoscaling pollerBehavior) {
511523
this.activityTaskPollersBehaviour = pollerBehavior;
512524
return this;
513525
}
514526

527+
/** Set the poller behavior for activity task pollers. */
515528
@Experimental
516529
public Builder setActivityTaskPollersBehaviour(PollerBehaviorSimpleMaximum pollerBehavior) {
517530
this.activityTaskPollersBehaviour = pollerBehavior;
518531
return this;
519532
}
520533

534+
/** Set the poller behavior for nexus task pollers. */
521535
@Experimental
522536
public Builder setNexusTaskPollersBehaviour(PollerBehaviorAutoscaling pollerBehavior) {
523537
this.nexusTaskPollersBehaviour = pollerBehavior;
524538
return this;
525539
}
526540

541+
/** Set the poller behavior for nexus task pollers. */
527542
@Experimental
528543
public Builder setNexusTaskPollersBehaviour(PollerBehaviorSimpleMaximum pollerBehavior) {
529544
this.nexusTaskPollersBehaviour = pollerBehavior;
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package io.temporal.internal.worker;
2+
3+
import io.grpc.Status;
4+
import io.grpc.StatusRuntimeException;
5+
import io.temporal.workflow.Functions;
6+
import org.junit.Test;
7+
import org.mockito.Mockito;
8+
9+
public class PollScaleReportHandleTest {
10+
11+
@Test
12+
public void handleResourceExhaustedError() {
13+
// Mock dependencies
14+
Functions.Proc1<Integer> mockScaleCallback = Mockito.mock(Functions.Proc1.class);
15+
PollScaleReportHandle<ScalingTask> handle =
16+
new PollScaleReportHandle<>(1, 10, 8, mockScaleCallback);
17+
18+
// Simulate RESOURCE_EXHAUSTED error
19+
StatusRuntimeException exception = new StatusRuntimeException(Status.RESOURCE_EXHAUSTED);
20+
handle.report(null, exception);
21+
22+
// Verify target poller count is halved and callback is invoked
23+
Mockito.verify(mockScaleCallback).apply(4);
24+
}
25+
26+
@Test
27+
public void handleGenericError() {
28+
// Mock dependencies
29+
Functions.Proc1<Integer> mockScaleCallback = Mockito.mock(Functions.Proc1.class);
30+
PollScaleReportHandle<ScalingTask> handle =
31+
new PollScaleReportHandle<>(1, 10, 5, mockScaleCallback);
32+
33+
// Simulate a generic error
34+
handle.report(null, new RuntimeException("Generic error"));
35+
36+
// Verify target poller count is decremented and callback is invoked
37+
Mockito.verify(mockScaleCallback).apply(4);
38+
}
39+
40+
@Test
41+
public void applyScalingDecisionDeltaWhenAllowed() {
42+
// Mock dependencies
43+
Functions.Proc1<Integer> mockScaleCallback = Mockito.mock(Functions.Proc1.class);
44+
ScalingTask mockTask = Mockito.mock(ScalingTask.class);
45+
ScalingTask.ScalingDecision mockDecision = Mockito.mock(ScalingTask.ScalingDecision.class);
46+
Mockito.when(mockTask.getScalingDecision()).thenReturn(mockDecision);
47+
Mockito.when(mockDecision.getPollRequestDeltaSuggestion()).thenReturn(3);
48+
49+
PollScaleReportHandle<ScalingTask> handle =
50+
new PollScaleReportHandle<>(1, 10, 5, mockScaleCallback);
51+
handle.run(); // Enable scale-up
52+
53+
// Report a task with a scaling decision
54+
handle.report(mockTask, null);
55+
56+
// Verify target poller count is updated and callback is invoked
57+
Mockito.verify(mockScaleCallback).apply(8);
58+
}
59+
}

0 commit comments

Comments
 (0)