Skip to content

Commit b14918b

Browse files
committed
Review comments
1 parent 646981c commit b14918b

File tree

4 files changed

+17
-19
lines changed

4 files changed

+17
-19
lines changed

temporal-sdk/src/main/java/io/temporal/internal/worker/BasePoller.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean
6565
// When graceful poll shutdown is enabled, the server will complete outstanding polls with
6666
// empty responses after ShutdownWorker is called. We simply wait for polls to return.
6767
pollExecutorShutdown =
68-
shutdownManager.shutdownExecutorUntimed(pollExecutor, this + "#pollExecutor");
68+
shutdownManager.shutdownExecutor(
69+
pollExecutor, this + "#pollExecutor", Duration.ofSeconds(80));
6970
} else {
7071
// Old behaviour forcibly stops outstanding polls.
7172
pollExecutorShutdown =
Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.temporal.internal.worker;
22

3+
import io.temporal.api.namespace.v1.NamespaceInfo.Capabilities;
34
import java.util.concurrent.atomic.AtomicBoolean;
45

56
/**
@@ -11,19 +12,20 @@ public final class NamespaceCapabilities {
1112
private final AtomicBoolean pollerAutoscaling = new AtomicBoolean(false);
1213
private final AtomicBoolean gracefulPollShutdown = new AtomicBoolean(false);
1314

14-
public boolean isPollerAutoscaling() {
15-
return pollerAutoscaling.get();
15+
public void setFromCapabilities(Capabilities capabilities) {
16+
if (capabilities.getPollerAutoscaling()) {
17+
pollerAutoscaling.set(true);
18+
}
19+
if (capabilities.getWorkerPollCompleteOnShutdown()) {
20+
gracefulPollShutdown.set(true);
21+
}
1622
}
1723

18-
public void setPollerAutoscaling(boolean value) {
19-
pollerAutoscaling.set(value);
24+
public boolean isPollerAutoscaling() {
25+
return pollerAutoscaling.get();
2026
}
2127

2228
public boolean isGracefulPollShutdown() {
2329
return gracefulPollShutdown.get();
2430
}
25-
26-
public void setGracefulPollShutdown(boolean value) {
27-
gracefulPollShutdown.set(value);
28-
}
2931
}

temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -265,15 +265,8 @@ public synchronized void start() {
265265
DescribeNamespaceRequest.newBuilder()
266266
.setNamespace(workflowClient.getOptions().getNamespace())
267267
.build());
268-
if (describeNamespaceResponse.getNamespaceInfo().getCapabilities().getPollerAutoscaling()) {
269-
namespaceCapabilities.setPollerAutoscaling(true);
270-
}
271-
if (describeNamespaceResponse
272-
.getNamespaceInfo()
273-
.getCapabilities()
274-
.getWorkerPollCompleteOnShutdown()) {
275-
namespaceCapabilities.setGracefulPollShutdown(true);
276-
}
268+
namespaceCapabilities.setFromCapabilities(
269+
describeNamespaceResponse.getNamespaceInfo().getCapabilities());
277270

278271
// Build plugin execution chain (reverse order for proper nesting)
279272
Consumer<WorkerFactory> startChain = WorkerFactory::doStart;

temporal-sdk/src/test/java/io/temporal/internal/worker/GracefulPollShutdownTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import static org.junit.Assert.*;
44

55
import com.uber.m3.tally.NoopScope;
6+
import io.temporal.api.namespace.v1.NamespaceInfo.Capabilities;
67
import io.temporal.worker.tuning.PollerBehaviorSimpleMaximum;
78
import java.util.concurrent.CompletableFuture;
89
import java.util.concurrent.CountDownLatch;
@@ -30,7 +31,8 @@ public static Object[] data() {
3031
@Test(timeout = 10_000)
3132
public void inflightPollSurvivesShutdownOnlyWhenGraceful() throws Exception {
3233
NamespaceCapabilities capabilities = new NamespaceCapabilities();
33-
capabilities.setGracefulPollShutdown(graceful);
34+
capabilities.setFromCapabilities(
35+
Capabilities.newBuilder().setWorkerPollCompleteOnShutdown(true).build());
3436

3537
AtomicReference<String> processedTask = new AtomicReference<>();
3638
CountDownLatch taskProcessedLatch = new CountDownLatch(1);

0 commit comments

Comments
 (0)