Skip to content

Commit 44bb603

Browse files
authored
Shutdown task loss prevention (#2820)
1 parent d45886c commit 44bb603

27 files changed

Lines changed: 827 additions & 238 deletions

.github/workflows/ci.yml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ jobs:
6060
report_paths: "**/build/test-results/test/TEST-*.xml"
6161

6262
unit_test_jdk8:
63-
name: Unit test with docker service [JDK8]
63+
name: Unit test with CLI
6464
runs-on: ubuntu-latest-16-cores
6565
timeout-minutes: 30
6666
steps:
@@ -82,7 +82,7 @@ jobs:
8282
- name: Set up Gradle
8383
uses: gradle/actions/setup-gradle@ac396bf1a80af16236baf54bd7330ae21dc6ece5 # v6
8484

85-
- name: Start containerized server and dependencies
85+
- name: Start CLI server
8686
env:
8787
TEMPORAL_CLI_VERSION: 1.7.0
8888
run: |
@@ -110,6 +110,7 @@ jobs:
110110
--dynamic-config-value history.enableRequestIdRefLinks=true \
111111
--dynamic-config-value frontend.WorkerHeartbeatsEnabled=true \
112112
--dynamic-config-value frontend.ListWorkersEnabled=true \
113+
--dynamic-config-value frontend.enableCancelWorkerPollsOnShutdown=true \
113114
--dynamic-config-value 'component.callbacks.allowedAddresses=[{"Pattern":"localhost:7243","AllowInsecure":true}]' \
114115
--dynamic-config-value frontend.activityAPIsEnabled=true \
115116
--dynamic-config-value activity.enableStandalone=true \

temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityPollTask.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public ActivityPollTask(
3737
@Nonnull String namespace,
3838
@Nonnull String taskQueue,
3939
@Nonnull String identity,
40+
@Nonnull String workerInstanceKey,
4041
@Nonnull WorkerVersioningOptions versioningOptions,
4142
double activitiesPerSecond,
4243
@Nonnull TrackingSlotSupplier<ActivitySlotInfo> slotSupplier,
@@ -53,6 +54,7 @@ public ActivityPollTask(
5354
.setNamespace(namespace)
5455
.setIdentity(identity)
5556
.setTaskQueue(TaskQueue.newBuilder().setName(taskQueue));
57+
pollRequest.setWorkerInstanceKey(workerInstanceKey);
5658
if (activitiesPerSecond > 0) {
5759
pollRequest.setTaskQueueMetadata(
5860
TaskQueueMetadata.newBuilder()

temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ public boolean start() {
105105
namespace,
106106
taskQueue,
107107
options.getIdentity(),
108+
options.getWorkerInstanceKey(),
108109
options.getWorkerVersioningOptions(),
109110
taskQueueActivitiesPerSecond,
110111
this.slotSupplier,
@@ -113,7 +114,7 @@ public boolean start() {
113114
pollerTracker),
114115
this.pollTaskExecutor,
115116
pollerOptions,
116-
namespaceCapabilities.isPollerAutoscaling(),
117+
namespaceCapabilities,
117118
workerMetricsScope);
118119

119120
} else {
@@ -125,6 +126,7 @@ public boolean start() {
125126
namespace,
126127
taskQueue,
127128
options.getIdentity(),
129+
options.getWorkerInstanceKey(),
128130
options.getWorkerVersioningOptions(),
129131
taskQueueActivitiesPerSecond,
130132
this.slotSupplier,
@@ -133,7 +135,8 @@ public boolean start() {
133135
pollerTracker),
134136
this.pollTaskExecutor,
135137
pollerOptions,
136-
workerMetricsScope);
138+
workerMetricsScope,
139+
namespaceCapabilities);
137140
}
138141
poller.start();
139142
workerMetricsScope.counter(MetricsType.WORKER_START_COUNTER).inc(1);

temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncActivityPollTask.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public AsyncActivityPollTask(
4343
@Nonnull String namespace,
4444
@Nonnull String taskQueue,
4545
@Nonnull String identity,
46+
@Nonnull String workerInstanceKey,
4647
@Nonnull WorkerVersioningOptions versioningOptions,
4748
double activitiesPerSecond,
4849
@Nonnull TrackingSlotSupplier<ActivitySlotInfo> slotSupplier,
@@ -59,6 +60,7 @@ public AsyncActivityPollTask(
5960
.setNamespace(namespace)
6061
.setIdentity(identity)
6162
.setTaskQueue(TaskQueue.newBuilder().setName(taskQueue));
63+
pollRequest.setWorkerInstanceKey(workerInstanceKey);
6264
if (activitiesPerSecond > 0) {
6365
pollRequest.setTaskQueueMetadata(
6466
TaskQueueMetadata.newBuilder()

temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncNexusPollTask.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public AsyncNexusPollTask(
4141
@Nonnull String namespace,
4242
@Nonnull String taskQueue,
4343
@Nonnull String identity,
44+
@Nonnull String workerInstanceKey,
4445
@Nonnull WorkerVersioningOptions versioningOptions,
4546
@Nonnull Scope metricsScope,
4647
@Nonnull Supplier<GetSystemInfoResponse.Capabilities> serverCapabilities,
@@ -57,6 +58,8 @@ public AsyncNexusPollTask(
5758
.setIdentity(identity)
5859
.setTaskQueue(TaskQueue.newBuilder().setName(taskQueue));
5960

61+
pollRequest.setWorkerInstanceKey(workerInstanceKey);
62+
6063
if (versioningOptions.getWorkerDeploymentOptions() != null) {
6164
pollRequest.setDeploymentOptions(
6265
WorkerVersioningProtoUtils.deploymentOptionsToProto(

temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncPoller.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ final class AsyncPoller<T extends ScalingTask> extends BasePoller<T> {
2929
private final List<PollTaskAsync<T>> asyncTaskPollers;
3030
private final PollerOptions pollerOptions;
3131
private final PollerBehaviorAutoscaling pollerBehavior;
32-
private final boolean serverSupportsAutoscaling;
3332
private final Scope workerMetricsScope;
3433
private Throttler pollRateThrottler;
3534
private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler =
@@ -43,15 +42,15 @@ final class AsyncPoller<T extends ScalingTask> extends BasePoller<T> {
4342
PollTaskAsync<T> asyncTaskPoller,
4443
ShutdownableTaskExecutor<T> taskExecutor,
4544
PollerOptions pollerOptions,
46-
boolean serverSupportsAutoscaling,
45+
NamespaceCapabilities namespaceCapabilities,
4746
Scope workerMetricsScope) {
4847
this(
4948
slotSupplier,
5049
slotReservationData,
5150
Collections.singletonList(asyncTaskPoller),
5251
taskExecutor,
5352
pollerOptions,
54-
serverSupportsAutoscaling,
53+
namespaceCapabilities,
5554
workerMetricsScope);
5655
}
5756

@@ -61,9 +60,9 @@ final class AsyncPoller<T extends ScalingTask> extends BasePoller<T> {
6160
List<PollTaskAsync<T>> asyncTaskPollers,
6261
ShutdownableTaskExecutor<T> taskExecutor,
6362
PollerOptions pollerOptions,
64-
boolean serverSupportsAutoscaling,
63+
NamespaceCapabilities namespaceCapabilities,
6564
Scope workerMetricsScope) {
66-
super(taskExecutor);
65+
super(taskExecutor, namespaceCapabilities);
6766
Objects.requireNonNull(slotSupplier, "slotSupplier cannot be null");
6867
Objects.requireNonNull(slotReservationData, "slotReservation data should not be null");
6968
Objects.requireNonNull(asyncTaskPollers, "asyncTaskPollers should not be null");
@@ -82,7 +81,6 @@ final class AsyncPoller<T extends ScalingTask> extends BasePoller<T> {
8281
+ " is not supported for AsyncPoller. Only PollerBehaviorAutoscaling is supported.");
8382
}
8483
this.pollerBehavior = (PollerBehaviorAutoscaling) pollerOptions.getPollerBehavior();
85-
this.serverSupportsAutoscaling = serverSupportsAutoscaling;
8684
this.pollerOptions = pollerOptions;
8785
this.workerMetricsScope = workerMetricsScope;
8886
}
@@ -114,7 +112,7 @@ public boolean start() {
114112
pollerBehavior.getMinConcurrentTaskPollers(),
115113
pollerBehavior.getMaxConcurrentTaskPollers(),
116114
pollerBehavior.getInitialConcurrentTaskPollers(),
117-
serverSupportsAutoscaling,
115+
namespaceCapabilities.isPollerAutoscaling(),
118116
(newTarget) -> {
119117
log.debug(
120118
"Updating maximum number of pollers for {} to: {}",
@@ -136,12 +134,14 @@ public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean
136134
return super.shutdown(shutdownManager, interruptTasks)
137135
.thenApply(
138136
(f) -> {
139-
for (PollTaskAsync<T> asyncTaskPoller : asyncTaskPollers) {
140-
try {
141-
log.debug("Shutting down async poller: {}", asyncTaskPoller.getLabel());
142-
asyncTaskPoller.cancel(new RuntimeException("Shutting down poller"));
143-
} catch (Throwable e) {
144-
log.error("Error while cancelling poll task", e);
137+
if (interruptTasks || !namespaceCapabilities.isGracefulPollShutdown()) {
138+
for (PollTaskAsync<T> asyncTaskPoller : asyncTaskPollers) {
139+
try {
140+
log.debug("Shutting down async poller: {}", asyncTaskPoller.getLabel());
141+
asyncTaskPoller.cancel(new RuntimeException("Shutting down poller"));
142+
} catch (Throwable e) {
143+
log.error("Error while cancelling poll task", e);
144+
}
145145
}
146146
}
147147
return null;

temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncWorkflowPollTask.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public AsyncWorkflowPollTask(
5252
@Nonnull String taskQueue,
5353
@Nullable String stickyTaskQueue,
5454
@Nonnull String identity,
55+
@Nonnull String workerInstanceKey,
5556
@Nonnull WorkerVersioningOptions versioningOptions,
5657
@Nonnull TrackingSlotSupplier<WorkflowSlotInfo> slotSupplier,
5758
@Nonnull Scope metricsScope,
@@ -67,6 +68,8 @@ public AsyncWorkflowPollTask(
6768
.setNamespace(Objects.requireNonNull(namespace))
6869
.setIdentity(Objects.requireNonNull(identity));
6970

71+
pollRequestBuilder.setWorkerInstanceKey(workerInstanceKey);
72+
7073
if (versioningOptions.getWorkerDeploymentOptions() != null) {
7174
pollRequestBuilder.setDeploymentOptions(
7275
WorkerVersioningProtoUtils.deploymentOptionsToProto(

temporal-sdk/src/main/java/io/temporal/internal/worker/BasePoller.java

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,14 @@ abstract class BasePoller<T> implements SuspendableWorker {
2727

2828
protected ExecutorService pollExecutor;
2929

30-
protected BasePoller(ShutdownableTaskExecutor<T> taskExecutor) {
30+
protected final NamespaceCapabilities namespaceCapabilities;
31+
32+
protected BasePoller(
33+
ShutdownableTaskExecutor<T> taskExecutor, NamespaceCapabilities namespaceCapabilities) {
3134
Objects.requireNonNull(taskExecutor, "taskExecutor should not be null");
3235
this.taskExecutor = taskExecutor;
36+
this.namespaceCapabilities =
37+
Objects.requireNonNull(namespaceCapabilities, "namespaceCapabilities should not be null");
3338
}
3439

3540
@Override
@@ -55,15 +60,24 @@ public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean
5560
return CompletableFuture.completedFuture(null);
5661
}
5762

58-
return shutdownManager
59-
// it's ok to forcefully shutdown pollers, because they are stuck in a long poll call
60-
// so we don't risk loosing any progress doing that.
61-
.shutdownExecutorNow(pollExecutor, this + "#pollExecutor", Duration.ofSeconds(1))
62-
.exceptionally(
63-
e -> {
64-
log.error("Unexpected exception during shutdown", e);
65-
return null;
66-
});
63+
CompletableFuture<Void> pollExecutorShutdown;
64+
if (namespaceCapabilities.isGracefulPollShutdown() && !interruptTasks) {
65+
// When graceful poll shutdown is enabled, the server will complete outstanding polls with
66+
// empty responses after ShutdownWorker is called. We simply wait for polls to return.
67+
pollExecutorShutdown =
68+
shutdownManager.shutdownExecutor(
69+
pollExecutor, this + "#pollExecutor", Duration.ofSeconds(80));
70+
} else {
71+
// ShutdownNow and old servers forcibly stop outstanding polls.
72+
pollExecutorShutdown =
73+
shutdownManager.shutdownExecutorNow(
74+
pollExecutor, this + "#pollExecutor", Duration.ofSeconds(1));
75+
}
76+
return pollExecutorShutdown.exceptionally(
77+
e -> {
78+
log.error("Unexpected exception during shutdown", e);
79+
return null;
80+
});
6781
}
6882

6983
@Override

temporal-sdk/src/main/java/io/temporal/internal/worker/MultiThreadedPoller.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,9 @@ public MultiThreadedPoller(
5252
PollTask<T> pollTask,
5353
ShutdownableTaskExecutor<T> taskExecutor,
5454
PollerOptions pollerOptions,
55-
Scope workerMetricsScope) {
56-
super(taskExecutor);
55+
Scope workerMetricsScope,
56+
NamespaceCapabilities namespaceCapabilities) {
57+
super(taskExecutor, namespaceCapabilities);
5758
Objects.requireNonNull(identity, "identity cannot be null");
5859
Objects.requireNonNull(pollTask, "poll service should not be null");
5960
Objects.requireNonNull(pollerOptions, "pollerOptions should not be null");

temporal-sdk/src/main/java/io/temporal/internal/worker/NamespaceCapabilities.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.temporal.internal.worker;
22

3+
import io.temporal.api.namespace.v1.NamespaceInfo.Capabilities;
34
import java.util.concurrent.atomic.AtomicBoolean;
45

56
/**
@@ -9,14 +10,31 @@
910
*/
1011
public final class NamespaceCapabilities {
1112
private final AtomicBoolean pollerAutoscaling = new AtomicBoolean(false);
13+
private final AtomicBoolean gracefulPollShutdown = new AtomicBoolean(false);
1214
private final AtomicBoolean workerHeartbeats = new AtomicBoolean(false);
1315

16+
public void setFromCapabilities(Capabilities capabilities) {
17+
if (capabilities.getPollerAutoscaling()) {
18+
pollerAutoscaling.set(true);
19+
}
20+
if (capabilities.getWorkerPollCompleteOnShutdown()) {
21+
gracefulPollShutdown.set(true);
22+
}
23+
if (capabilities.getWorkerHeartbeats()) {
24+
workerHeartbeats.set(true);
25+
}
26+
}
27+
1428
public boolean isPollerAutoscaling() {
1529
return pollerAutoscaling.get();
1630
}
1731

18-
public void setPollerAutoscaling(boolean value) {
19-
pollerAutoscaling.set(value);
32+
public boolean isGracefulPollShutdown() {
33+
return gracefulPollShutdown.get();
34+
}
35+
36+
public void setGracefulPollShutdown(boolean value) {
37+
gracefulPollShutdown.set(value);
2038
}
2139

2240
public boolean isWorkerHeartbeats() {

0 commit comments

Comments
 (0)