Skip to content

Commit b19042b

Browse files
authored
Expose ShutdownManager poll interval via WorkerFactoryOptions (#2876)
1 parent 1e110b2 commit b19042b

5 files changed

Lines changed: 135 additions & 5 deletions

File tree

temporal-sdk/src/main/java/io/temporal/internal/worker/ShutdownManager.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,23 @@ public class ShutdownManager implements Closeable {
2121
new ExecutorThreadFactory(
2222
WorkerThreadsNameHelper.SHUTDOWN_MANAGER_THREAD_NAME_PREFIX, null));
2323

24-
private static final int CHECK_PERIOD_MS = 250;
24+
private static final int DEFAULT_CHECK_PERIOD_MS = 250;
25+
private final int checkPeriodMs;
26+
27+
public ShutdownManager() {
28+
this(DEFAULT_CHECK_PERIOD_MS);
29+
}
30+
31+
/**
32+
* @param checkPeriodMs interval in milliseconds between shutdown-completion polls. Lower values
33+
* speed up teardown at the cost of more frequent polling. Must be positive.
34+
*/
35+
public ShutdownManager(int checkPeriodMs) {
36+
if (checkPeriodMs <= 0) {
37+
throw new IllegalArgumentException("checkPeriodMs must be positive, was: " + checkPeriodMs);
38+
}
39+
this.checkPeriodMs = checkPeriodMs;
40+
}
2541

2642
/** executorToShutdown.shutdownNow() -&gt; timed wait for a graceful termination */
2743
public CompletableFuture<Void> shutdownExecutorNow(
@@ -97,7 +113,7 @@ private CompletableFuture<Void> untimedWait(
97113
*/
98114
private CompletableFuture<Void> limitedWait(
99115
ExecutorService executorToShutdown, String executorName, Duration timeout) {
100-
int attempts = (int) Math.ceil((double) timeout.toMillis() / CHECK_PERIOD_MS);
116+
int attempts = (int) Math.ceil((double) timeout.toMillis() / checkPeriodMs);
101117

102118
CompletableFuture<Void> future = new CompletableFuture<>();
103119
scheduledExecutorService.submit(
@@ -167,7 +183,7 @@ public void run() {
167183
promise.complete(null);
168184
return;
169185
}
170-
scheduledExecutorService.schedule(this, CHECK_PERIOD_MS, TimeUnit.MILLISECONDS);
186+
scheduledExecutorService.schedule(this, checkPeriodMs, TimeUnit.MILLISECONDS);
171187
}
172188

173189
abstract boolean isTerminated();
@@ -238,7 +254,7 @@ public void run() {
238254
onSlowTermination();
239255
}
240256
}
241-
scheduledExecutorService.schedule(this, CHECK_PERIOD_MS, TimeUnit.MILLISECONDS);
257+
scheduledExecutorService.schedule(this, checkPeriodMs, TimeUnit.MILLISECONDS);
242258
}
243259

244260
abstract boolean isTerminated();

temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,8 @@ private void shutdownInternal(boolean interruptUserTasks) {
411411

412412
/** Internal method that actually shuts down workers. Called from the plugin chain. */
413413
private void doShutdown(boolean interruptUserTasks) {
414-
ShutdownManager shutdownManager = new ShutdownManager();
414+
ShutdownManager shutdownManager =
415+
new ShutdownManager((int) factoryOptions.getShutdownCheckInterval().toMillis());
415416

416417
// Shutdown each worker with plugin hooks
417418
List<CompletableFuture<Void>> shutdownFutures = new ArrayList<>();

temporal-sdk/src/main/java/io/temporal/worker/WorkerFactoryOptions.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public static WorkerFactoryOptions getDefaultInstance() {
2424

2525
private static final int DEFAULT_WORKFLOW_CACHE_SIZE = 600;
2626
private static final int DEFAULT_MAX_WORKFLOW_THREAD_COUNT = 600;
27+
private static final Duration DEFAULT_SHUTDOWN_CHECK_INTERVAL = Duration.ofMillis(250);
2728

2829
private static final WorkerFactoryOptions DEFAULT_INSTANCE;
2930

@@ -41,6 +42,7 @@ public static class Builder {
4142
private boolean enableLoggingInReplay;
4243
private boolean usingVirtualWorkflowThreads;
4344
private ExecutorService overrideLocalActivityTaskExecutor;
45+
private Duration shutdownCheckInterval;
4446

4547
private Builder() {}
4648

@@ -57,6 +59,7 @@ private Builder(WorkerFactoryOptions options) {
5759
this.enableLoggingInReplay = options.enableLoggingInReplay;
5860
this.usingVirtualWorkflowThreads = options.usingVirtualWorkflowThreads;
5961
this.overrideLocalActivityTaskExecutor = options.overrideLocalActivityTaskExecutor;
62+
this.shutdownCheckInterval = options.shutdownCheckInterval;
6063
}
6164

6265
/**
@@ -155,6 +158,22 @@ Builder setOverrideLocalActivityTaskExecutor(
155158
return this;
156159
}
157160

161+
/**
162+
* Sets the interval between polls when checking for executor termination during shutdown. Lower
163+
* values speed up shutdown at the cost of more frequent polling.
164+
*
165+
* <p>Default is 250ms, which is suitable for production. For test environments, consider
166+
* setting a much lower value (e.g., 1ms) to minimize teardown overhead.
167+
*
168+
* @param shutdownCheckInterval the interval between shutdown polls. Must be positive.
169+
* @return this builder for chaining
170+
*/
171+
@Experimental
172+
public Builder setShutdownCheckInterval(Duration shutdownCheckInterval) {
173+
this.shutdownCheckInterval = shutdownCheckInterval;
174+
return this;
175+
}
176+
158177
public WorkerFactoryOptions build() {
159178
return new WorkerFactoryOptions(
160179
workflowCacheSize,
@@ -165,6 +184,7 @@ public WorkerFactoryOptions build() {
165184
enableLoggingInReplay,
166185
usingVirtualWorkflowThreads,
167186
overrideLocalActivityTaskExecutor,
187+
shutdownCheckInterval,
168188
false);
169189
}
170190

@@ -189,6 +209,7 @@ public WorkerFactoryOptions validateAndBuildWithDefaults() {
189209
enableLoggingInReplay,
190210
usingVirtualWorkflowThreads,
191211
overrideLocalActivityTaskExecutor,
212+
shutdownCheckInterval,
192213
true);
193214
}
194215
}
@@ -201,6 +222,7 @@ public WorkerFactoryOptions validateAndBuildWithDefaults() {
201222
private final boolean enableLoggingInReplay;
202223
private final boolean usingVirtualWorkflowThreads;
203224
private final ExecutorService overrideLocalActivityTaskExecutor;
225+
private final Duration shutdownCheckInterval;
204226

205227
private WorkerFactoryOptions(
206228
int workflowCacheSize,
@@ -211,6 +233,7 @@ private WorkerFactoryOptions(
211233
boolean enableLoggingInReplay,
212234
boolean usingVirtualWorkflowThreads,
213235
ExecutorService overrideLocalActivityTaskExecutor,
236+
Duration shutdownCheckInterval,
214237
boolean validate) {
215238
if (validate) {
216239
Preconditions.checkState(workflowCacheSize >= 0, "negative workflowCacheSize");
@@ -233,6 +256,13 @@ private WorkerFactoryOptions(
233256
if (plugins == null) {
234257
plugins = new WorkerPlugin[0];
235258
}
259+
if (shutdownCheckInterval != null) {
260+
Preconditions.checkState(
261+
!shutdownCheckInterval.isNegative() && !shutdownCheckInterval.isZero(),
262+
"shutdownCheckInterval must be positive");
263+
} else {
264+
shutdownCheckInterval = DEFAULT_SHUTDOWN_CHECK_INTERVAL;
265+
}
236266
}
237267
this.workflowCacheSize = workflowCacheSize;
238268
this.maxWorkflowThreadCount = maxWorkflowThreadCount;
@@ -243,6 +273,7 @@ private WorkerFactoryOptions(
243273
this.enableLoggingInReplay = enableLoggingInReplay;
244274
this.usingVirtualWorkflowThreads = usingVirtualWorkflowThreads;
245275
this.overrideLocalActivityTaskExecutor = overrideLocalActivityTaskExecutor;
276+
this.shutdownCheckInterval = shutdownCheckInterval;
246277
}
247278

248279
public int getWorkflowCacheSize() {
@@ -291,6 +322,16 @@ ExecutorService getOverrideLocalActivityTaskExecutor() {
291322
return overrideLocalActivityTaskExecutor;
292323
}
293324

325+
/**
326+
* Returns the interval between polls when checking for executor termination during shutdown.
327+
*
328+
* @return the configured shutdown check interval
329+
*/
330+
@Experimental
331+
public Duration getShutdownCheckInterval() {
332+
return shutdownCheckInterval;
333+
}
334+
294335
/**
295336
* @deprecated not used anymore by JavaSDK, this value doesn't have any effect
296337
*/
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package io.temporal.internal.worker;
2+
3+
import org.junit.Test;
4+
5+
public class ShutdownManagerTest {
6+
7+
@Test(expected = IllegalArgumentException.class)
8+
public void zeroPeriodIsRejected() {
9+
new ShutdownManager(0);
10+
}
11+
12+
@Test(expected = IllegalArgumentException.class)
13+
public void negativePeriodIsRejected() {
14+
new ShutdownManager(-1);
15+
}
16+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package io.temporal.worker;
2+
3+
import static org.junit.Assert.assertEquals;
4+
5+
import java.time.Duration;
6+
import org.junit.Test;
7+
8+
public class WorkerFactoryOptionsTest {
9+
10+
@Test
11+
public void shutdownCheckIntervalDefaultIs250ms() {
12+
WorkerFactoryOptions options = WorkerFactoryOptions.newBuilder().validateAndBuildWithDefaults();
13+
assertEquals(Duration.ofMillis(250), options.getShutdownCheckInterval());
14+
}
15+
16+
@Test
17+
public void shutdownCheckIntervalCanBeSet() {
18+
Duration interval = Duration.ofMillis(5);
19+
WorkerFactoryOptions options =
20+
WorkerFactoryOptions.newBuilder().setShutdownCheckInterval(interval).build();
21+
assertEquals(interval, options.getShutdownCheckInterval());
22+
}
23+
24+
@Test
25+
public void shutdownCheckIntervalSurvivesValidateAndBuild() {
26+
Duration interval = Duration.ofMillis(10);
27+
WorkerFactoryOptions options =
28+
WorkerFactoryOptions.newBuilder()
29+
.setShutdownCheckInterval(interval)
30+
.validateAndBuildWithDefaults();
31+
assertEquals(interval, options.getShutdownCheckInterval());
32+
}
33+
34+
@Test
35+
public void shutdownCheckIntervalSurvivesCopyBuilder() {
36+
Duration interval = Duration.ofMillis(3);
37+
WorkerFactoryOptions original =
38+
WorkerFactoryOptions.newBuilder().setShutdownCheckInterval(interval).build();
39+
WorkerFactoryOptions copy = WorkerFactoryOptions.newBuilder(original).build();
40+
assertEquals(interval, copy.getShutdownCheckInterval());
41+
}
42+
43+
@Test(expected = IllegalStateException.class)
44+
public void shutdownCheckIntervalRejectsZero() {
45+
WorkerFactoryOptions.newBuilder()
46+
.setShutdownCheckInterval(Duration.ZERO)
47+
.validateAndBuildWithDefaults();
48+
}
49+
50+
@Test(expected = IllegalStateException.class)
51+
public void shutdownCheckIntervalRejectsNegative() {
52+
WorkerFactoryOptions.newBuilder()
53+
.setShutdownCheckInterval(Duration.ofMillis(-1))
54+
.validateAndBuildWithDefaults();
55+
}
56+
}

0 commit comments

Comments
 (0)