Skip to content

Commit 8ae11f5

Browse files
committed
Prevent task loss on shutdown when server is capable
1 parent c195cd1 commit 8ae11f5

23 files changed

+260
-133
lines changed

temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityPollTask.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public ActivityPollTask(
3838
@Nonnull String namespace,
3939
@Nonnull String taskQueue,
4040
@Nonnull String identity,
41+
@Nonnull String workerInstanceKey,
4142
@Nonnull WorkerVersioningOptions versioningOptions,
4243
double activitiesPerSecond,
4344
@Nonnull TrackingSlotSupplier<ActivitySlotInfo> slotSupplier,
@@ -52,6 +53,7 @@ public ActivityPollTask(
5253
.setNamespace(namespace)
5354
.setIdentity(identity)
5455
.setTaskQueue(TaskQueue.newBuilder().setName(taskQueue));
56+
pollRequest.setWorkerInstanceKey(workerInstanceKey);
5557
if (activitiesPerSecond > 0) {
5658
pollRequest.setTaskQueueMetadata(
5759
TaskQueueMetadata.newBuilder()

temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.Optional;
2727
import java.util.concurrent.CompletableFuture;
2828
import java.util.concurrent.TimeUnit;
29-
import java.util.concurrent.atomic.AtomicBoolean;
3029
import javax.annotation.Nonnull;
3130
import org.slf4j.Logger;
3231
import org.slf4j.LoggerFactory;
@@ -49,7 +48,7 @@ final class ActivityWorker implements SuspendableWorker {
4948
private final GrpcRetryer grpcRetryer;
5049
private final GrpcRetryer.GrpcRetryerOptions replyGrpcRetryerOptions;
5150
private final TrackingSlotSupplier<ActivitySlotInfo> slotSupplier;
52-
private final AtomicBoolean serverSupportsAutoscaling;
51+
private final NamespaceCapabilities namespaceCapabilities;
5352

5453
public ActivityWorker(
5554
@Nonnull WorkflowServiceStubs service,
@@ -59,7 +58,7 @@ public ActivityWorker(
5958
@Nonnull SingleWorkerOptions options,
6059
@Nonnull ActivityTaskHandler handler,
6160
@Nonnull SlotSupplier<ActivitySlotInfo> slotSupplier,
62-
@Nonnull AtomicBoolean serverSupportsAutoscaling) {
61+
@Nonnull NamespaceCapabilities namespaceCapabilities) {
6362
this.service = Objects.requireNonNull(service);
6463
this.namespace = Objects.requireNonNull(namespace);
6564
this.taskQueue = Objects.requireNonNull(taskQueue);
@@ -75,7 +74,7 @@ public ActivityWorker(
7574
DefaultStubServiceOperationRpcRetryOptions.INSTANCE, null);
7675

7776
this.slotSupplier = new TrackingSlotSupplier<>(slotSupplier, this.workerMetricsScope);
78-
this.serverSupportsAutoscaling = serverSupportsAutoscaling;
77+
this.namespaceCapabilities = namespaceCapabilities;
7978
}
8079

8180
@Override
@@ -103,14 +102,15 @@ public boolean start() {
103102
namespace,
104103
taskQueue,
105104
options.getIdentity(),
105+
options.getWorkerInstanceKey(),
106106
options.getWorkerVersioningOptions(),
107107
taskQueueActivitiesPerSecond,
108108
this.slotSupplier,
109109
workerMetricsScope,
110110
service.getServerCapabilities()),
111111
this.pollTaskExecutor,
112112
pollerOptions,
113-
serverSupportsAutoscaling.get(),
113+
namespaceCapabilities,
114114
workerMetricsScope);
115115

116116
} else {
@@ -122,14 +122,16 @@ public boolean start() {
122122
namespace,
123123
taskQueue,
124124
options.getIdentity(),
125+
options.getWorkerInstanceKey(),
125126
options.getWorkerVersioningOptions(),
126127
taskQueueActivitiesPerSecond,
127128
this.slotSupplier,
128129
workerMetricsScope,
129130
service.getServerCapabilities()),
130131
this.pollTaskExecutor,
131132
pollerOptions,
132-
workerMetricsScope);
133+
workerMetricsScope,
134+
namespaceCapabilities);
133135
}
134136
poller.start();
135137
workerMetricsScope.counter(MetricsType.WORKER_START_COUNTER).inc(1);

temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncActivityPollTask.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public AsyncActivityPollTask(
4343
@Nonnull String namespace,
4444
@Nonnull String taskQueue,
4545
@Nonnull String identity,
46+
@Nonnull String workerInstanceKey,
4647
@Nonnull WorkerVersioningOptions versioningOptions,
4748
double activitiesPerSecond,
4849
@Nonnull TrackingSlotSupplier<ActivitySlotInfo> slotSupplier,
@@ -57,6 +58,7 @@ public AsyncActivityPollTask(
5758
.setNamespace(namespace)
5859
.setIdentity(identity)
5960
.setTaskQueue(TaskQueue.newBuilder().setName(taskQueue));
61+
pollRequest.setWorkerInstanceKey(workerInstanceKey);
6062
if (activitiesPerSecond > 0) {
6163
pollRequest.setTaskQueueMetadata(
6264
TaskQueueMetadata.newBuilder()

temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncNexusPollTask.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public AsyncNexusPollTask(
4242
@Nonnull String namespace,
4343
@Nonnull String taskQueue,
4444
@Nonnull String identity,
45+
@Nonnull String workerInstanceKey,
4546
@Nonnull WorkerVersioningOptions versioningOptions,
4647
@Nonnull Scope metricsScope,
4748
@Nonnull Supplier<GetSystemInfoResponse.Capabilities> serverCapabilities,
@@ -56,6 +57,8 @@ public AsyncNexusPollTask(
5657
.setIdentity(identity)
5758
.setTaskQueue(TaskQueue.newBuilder().setName(taskQueue));
5859

60+
pollRequest.setWorkerInstanceKey(workerInstanceKey);
61+
5962
if (versioningOptions.getWorkerDeploymentOptions() != null) {
6063
pollRequest.setDeploymentOptions(
6164
WorkerVersioningProtoUtils.deploymentOptionsToProto(

temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncPoller.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ final class AsyncPoller<T extends ScalingTask> extends BasePoller<T> {
2929
private final List<PollTaskAsync<T>> asyncTaskPollers;
3030
private final PollerOptions pollerOptions;
3131
private final PollerBehaviorAutoscaling pollerBehavior;
32-
private final boolean serverSupportsAutoscaling;
3332
private final Scope workerMetricsScope;
3433
private Throttler pollRateThrottler;
3534
private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler =
@@ -43,15 +42,15 @@ final class AsyncPoller<T extends ScalingTask> extends BasePoller<T> {
4342
PollTaskAsync<T> asyncTaskPoller,
4443
ShutdownableTaskExecutor<T> taskExecutor,
4544
PollerOptions pollerOptions,
46-
boolean serverSupportsAutoscaling,
45+
NamespaceCapabilities namespaceCapabilities,
4746
Scope workerMetricsScope) {
4847
this(
4948
slotSupplier,
5049
slotReservationData,
5150
Collections.singletonList(asyncTaskPoller),
5251
taskExecutor,
5352
pollerOptions,
54-
serverSupportsAutoscaling,
53+
namespaceCapabilities,
5554
workerMetricsScope);
5655
}
5756

@@ -61,9 +60,9 @@ final class AsyncPoller<T extends ScalingTask> extends BasePoller<T> {
6160
List<PollTaskAsync<T>> asyncTaskPollers,
6261
ShutdownableTaskExecutor<T> taskExecutor,
6362
PollerOptions pollerOptions,
64-
boolean serverSupportsAutoscaling,
63+
NamespaceCapabilities namespaceCapabilities,
6564
Scope workerMetricsScope) {
66-
super(taskExecutor);
65+
super(taskExecutor, namespaceCapabilities);
6766
Objects.requireNonNull(slotSupplier, "slotSupplier cannot be null");
6867
Objects.requireNonNull(slotReservationData, "slotReservation data should not be null");
6968
Objects.requireNonNull(asyncTaskPollers, "asyncTaskPollers should not be null");
@@ -82,7 +81,6 @@ final class AsyncPoller<T extends ScalingTask> extends BasePoller<T> {
8281
+ " is not supported for AsyncPoller. Only PollerBehaviorAutoscaling is supported.");
8382
}
8483
this.pollerBehavior = (PollerBehaviorAutoscaling) pollerOptions.getPollerBehavior();
85-
this.serverSupportsAutoscaling = serverSupportsAutoscaling;
8684
this.pollerOptions = pollerOptions;
8785
this.workerMetricsScope = workerMetricsScope;
8886
}
@@ -114,7 +112,7 @@ public boolean start() {
114112
pollerBehavior.getMinConcurrentTaskPollers(),
115113
pollerBehavior.getMaxConcurrentTaskPollers(),
116114
pollerBehavior.getInitialConcurrentTaskPollers(),
117-
serverSupportsAutoscaling,
115+
namespaceCapabilities.isPollerAutoscaling(),
118116
(newTarget) -> {
119117
log.debug(
120118
"Updating maximum number of pollers for {} to: {}",
@@ -136,12 +134,14 @@ public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean
136134
return super.shutdown(shutdownManager, interruptTasks)
137135
.thenApply(
138136
(f) -> {
139-
for (PollTaskAsync<T> asyncTaskPoller : asyncTaskPollers) {
140-
try {
141-
log.debug("Shutting down async poller: {}", asyncTaskPoller.getLabel());
142-
asyncTaskPoller.cancel(new RuntimeException("Shutting down poller"));
143-
} catch (Throwable e) {
144-
log.error("Error while cancelling poll task", e);
137+
if (!namespaceCapabilities.isGracefulPollShutdown()) {
138+
for (PollTaskAsync<T> asyncTaskPoller : asyncTaskPollers) {
139+
try {
140+
log.debug("Shutting down async poller: {}", asyncTaskPoller.getLabel());
141+
asyncTaskPoller.cancel(new RuntimeException("Shutting down poller"));
142+
} catch (Throwable e) {
143+
log.error("Error while cancelling poll task", e);
144+
}
145145
}
146146
}
147147
return null;

temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncWorkflowPollTask.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public AsyncWorkflowPollTask(
5353
@Nonnull String taskQueue,
5454
@Nullable String stickyTaskQueue,
5555
@Nonnull String identity,
56+
@Nonnull String workerInstanceKey,
5657
@Nonnull WorkerVersioningOptions versioningOptions,
5758
@Nonnull TrackingSlotSupplier<WorkflowSlotInfo> slotSupplier,
5859
@Nonnull Scope metricsScope,
@@ -66,6 +67,8 @@ public AsyncWorkflowPollTask(
6667
.setNamespace(Objects.requireNonNull(namespace))
6768
.setIdentity(Objects.requireNonNull(identity));
6869

70+
pollRequestBuilder.setWorkerInstanceKey(workerInstanceKey);
71+
6972
if (versioningOptions.getWorkerDeploymentOptions() != null) {
7073
pollRequestBuilder.setDeploymentOptions(
7174
WorkerVersioningProtoUtils.deploymentOptionsToProto(

temporal-sdk/src/main/java/io/temporal/internal/worker/BasePoller.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,14 @@ abstract class BasePoller<T> implements SuspendableWorker {
2727

2828
protected ExecutorService pollExecutor;
2929

30-
protected BasePoller(ShutdownableTaskExecutor<T> taskExecutor) {
30+
protected final NamespaceCapabilities namespaceCapabilities;
31+
32+
protected BasePoller(
33+
ShutdownableTaskExecutor<T> taskExecutor, NamespaceCapabilities namespaceCapabilities) {
3134
Objects.requireNonNull(taskExecutor, "taskExecutor should not be null");
3235
this.taskExecutor = taskExecutor;
36+
this.namespaceCapabilities =
37+
Objects.requireNonNull(namespaceCapabilities, "namespaceCapabilities should not be null");
3338
}
3439

3540
@Override
@@ -55,15 +60,23 @@ public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean
5560
return CompletableFuture.completedFuture(null);
5661
}
5762

58-
return shutdownManager
59-
// it's ok to forcefully shutdown pollers, because they are stuck in a long poll call
60-
// so we don't risk loosing any progress doing that.
61-
.shutdownExecutorNow(pollExecutor, this + "#pollExecutor", Duration.ofSeconds(1))
62-
.exceptionally(
63-
e -> {
64-
log.error("Unexpected exception during shutdown", e);
65-
return null;
66-
});
63+
CompletableFuture<Void> pollExecutorShutdown;
64+
if (namespaceCapabilities.isGracefulPollShutdown()) {
65+
// When graceful poll shutdown is enabled, the server will complete outstanding polls with
66+
// empty responses after ShutdownWorker is called. We simply wait for polls to return.
67+
pollExecutorShutdown =
68+
shutdownManager.shutdownExecutorUntimed(pollExecutor, this + "#pollExecutor");
69+
} else {
70+
// Old behaviour forcibly stops outstanding polls.
71+
pollExecutorShutdown =
72+
shutdownManager.shutdownExecutorNow(
73+
pollExecutor, this + "#pollExecutor", Duration.ofSeconds(1));
74+
}
75+
return pollExecutorShutdown.exceptionally(
76+
e -> {
77+
log.error("Unexpected exception during shutdown", e);
78+
return null;
79+
});
6780
}
6881

6982
@Override

temporal-sdk/src/main/java/io/temporal/internal/worker/MultiThreadedPoller.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,9 @@ public MultiThreadedPoller(
5252
PollTask<T> pollTask,
5353
ShutdownableTaskExecutor<T> taskExecutor,
5454
PollerOptions pollerOptions,
55-
Scope workerMetricsScope) {
56-
super(taskExecutor);
55+
Scope workerMetricsScope,
56+
NamespaceCapabilities namespaceCapabilities) {
57+
super(taskExecutor, namespaceCapabilities);
5758
Objects.requireNonNull(identity, "identity cannot be null");
5859
Objects.requireNonNull(pollTask, "poll service should not be null");
5960
Objects.requireNonNull(pollerOptions, "pollerOptions should not be null");
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package io.temporal.internal.worker;
2+
3+
import java.util.concurrent.atomic.AtomicBoolean;
4+
5+
/**
6+
* Holds namespace-level capabilities discovered from the server's DescribeNamespace response. A
7+
* single instance is shared across all workers in a WorkerFactory and is populated at startup. Uses
8+
* AtomicBooleans so capabilities can be set after construction.
9+
*/
10+
public final class NamespaceCapabilities {
11+
private final AtomicBoolean pollerAutoscaling = new AtomicBoolean(false);
12+
private final AtomicBoolean gracefulPollShutdown = new AtomicBoolean(false);
13+
14+
public boolean isPollerAutoscaling() {
15+
return pollerAutoscaling.get();
16+
}
17+
18+
public void setPollerAutoscaling(boolean value) {
19+
pollerAutoscaling.set(value);
20+
}
21+
22+
public boolean isGracefulPollShutdown() {
23+
return gracefulPollShutdown.get();
24+
}
25+
26+
public void setGracefulPollShutdown(boolean value) {
27+
gracefulPollShutdown.set(value);
28+
}
29+
}

temporal-sdk/src/main/java/io/temporal/internal/worker/NexusPollTask.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public NexusPollTask(
3535
@Nonnull String namespace,
3636
@Nonnull String taskQueue,
3737
@Nonnull String identity,
38+
@Nonnull String workerInstanceKey,
3839
@Nonnull WorkerVersioningOptions versioningOptions,
3940
@Nonnull TrackingSlotSupplier<NexusSlotInfo> slotSupplier,
4041
@Nonnull Scope metricsScope,
@@ -48,6 +49,7 @@ public NexusPollTask(
4849
.setNamespace(namespace)
4950
.setIdentity(identity)
5051
.setTaskQueue(TaskQueue.newBuilder().setName(taskQueue));
52+
pollRequest.setWorkerInstanceKey(workerInstanceKey);
5153

5254
if (versioningOptions.getWorkerDeploymentOptions() != null) {
5355
pollRequest.setDeploymentOptions(

0 commit comments

Comments
 (0)