Skip to content

Commit a714cc1

Browse files
authored
Allow poller scale-down on timeout when server supports autoscaling (#2812)
1 parent 02182b9 commit a714cc1

File tree

16 files changed

+164
-32
lines changed

16 files changed

+164
-32
lines changed

temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.Optional;
2727
import java.util.concurrent.CompletableFuture;
2828
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.atomic.AtomicBoolean;
2930
import javax.annotation.Nonnull;
3031
import org.slf4j.Logger;
3132
import org.slf4j.LoggerFactory;
@@ -48,6 +49,7 @@ final class ActivityWorker implements SuspendableWorker {
4849
private final GrpcRetryer grpcRetryer;
4950
private final GrpcRetryer.GrpcRetryerOptions replyGrpcRetryerOptions;
5051
private final TrackingSlotSupplier<ActivitySlotInfo> slotSupplier;
52+
private final AtomicBoolean serverSupportsAutoscaling;
5153

5254
public ActivityWorker(
5355
@Nonnull WorkflowServiceStubs service,
@@ -56,7 +58,8 @@ public ActivityWorker(
5658
double taskQueueActivitiesPerSecond,
5759
@Nonnull SingleWorkerOptions options,
5860
@Nonnull ActivityTaskHandler handler,
59-
@Nonnull SlotSupplier<ActivitySlotInfo> slotSupplier) {
61+
@Nonnull SlotSupplier<ActivitySlotInfo> slotSupplier,
62+
@Nonnull AtomicBoolean serverSupportsAutoscaling) {
6063
this.service = Objects.requireNonNull(service);
6164
this.namespace = Objects.requireNonNull(namespace);
6265
this.taskQueue = Objects.requireNonNull(taskQueue);
@@ -72,6 +75,7 @@ public ActivityWorker(
7275
DefaultStubServiceOperationRpcRetryOptions.INSTANCE, null);
7376

7477
this.slotSupplier = new TrackingSlotSupplier<>(slotSupplier, this.workerMetricsScope);
78+
this.serverSupportsAutoscaling = serverSupportsAutoscaling;
7579
}
7680

7781
@Override
@@ -106,6 +110,7 @@ public boolean start() {
106110
service.getServerCapabilities()),
107111
this.pollTaskExecutor,
108112
pollerOptions,
113+
serverSupportsAutoscaling.get(),
109114
workerMetricsScope);
110115

111116
} else {

temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncPoller.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ final class AsyncPoller<T extends ScalingTask> extends BasePoller<T> {
2929
private final List<PollTaskAsync<T>> asyncTaskPollers;
3030
private final PollerOptions pollerOptions;
3131
private final PollerBehaviorAutoscaling pollerBehavior;
32+
private final boolean serverSupportsAutoscaling;
3233
private final Scope workerMetricsScope;
3334
private Throttler pollRateThrottler;
3435
private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler =
@@ -42,13 +43,15 @@ final class AsyncPoller<T extends ScalingTask> extends BasePoller<T> {
4243
PollTaskAsync<T> asyncTaskPoller,
4344
ShutdownableTaskExecutor<T> taskExecutor,
4445
PollerOptions pollerOptions,
46+
boolean serverSupportsAutoscaling,
4547
Scope workerMetricsScope) {
4648
this(
4749
slotSupplier,
4850
slotReservationData,
4951
Collections.singletonList(asyncTaskPoller),
5052
taskExecutor,
5153
pollerOptions,
54+
serverSupportsAutoscaling,
5255
workerMetricsScope);
5356
}
5457

@@ -58,6 +61,7 @@ final class AsyncPoller<T extends ScalingTask> extends BasePoller<T> {
5861
List<PollTaskAsync<T>> asyncTaskPollers,
5962
ShutdownableTaskExecutor<T> taskExecutor,
6063
PollerOptions pollerOptions,
64+
boolean serverSupportsAutoscaling,
6165
Scope workerMetricsScope) {
6266
super(taskExecutor);
6367
Objects.requireNonNull(slotSupplier, "slotSupplier cannot be null");
@@ -78,6 +82,7 @@ final class AsyncPoller<T extends ScalingTask> extends BasePoller<T> {
7882
+ " is not supported for AsyncPoller. Only PollerBehaviorAutoscaling is supported.");
7983
}
8084
this.pollerBehavior = (PollerBehaviorAutoscaling) pollerOptions.getPollerBehavior();
85+
this.serverSupportsAutoscaling = serverSupportsAutoscaling;
8186
this.pollerOptions = pollerOptions;
8287
this.workerMetricsScope = workerMetricsScope;
8388
}
@@ -109,6 +114,7 @@ public boolean start() {
109114
pollerBehavior.getMinConcurrentTaskPollers(),
110115
pollerBehavior.getMaxConcurrentTaskPollers(),
111116
pollerBehavior.getInitialConcurrentTaskPollers(),
117+
serverSupportsAutoscaling,
112118
(newTarget) -> {
113119
log.debug(
114120
"Updating maximum number of pollers for {} to: {}",

temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.concurrent.CompletableFuture;
3131
import java.util.concurrent.TimeUnit;
3232
import java.util.concurrent.TimeoutException;
33+
import java.util.concurrent.atomic.AtomicBoolean;
3334
import javax.annotation.Nonnull;
3435
import org.slf4j.Logger;
3536
import org.slf4j.LoggerFactory;
@@ -52,6 +53,7 @@ final class NexusWorker implements SuspendableWorker {
5253
private final GrpcRetryer grpcRetryer;
5354
private final GrpcRetryer.GrpcRetryerOptions replyGrpcRetryerOptions;
5455
private final TrackingSlotSupplier<NexusSlotInfo> slotSupplier;
56+
private final AtomicBoolean serverSupportsAutoscaling;
5557
private final boolean forceOldFailureFormat;
5658

5759
public NexusWorker(
@@ -61,7 +63,8 @@ public NexusWorker(
6163
@Nonnull SingleWorkerOptions options,
6264
@Nonnull NexusTaskHandler handler,
6365
@Nonnull DataConverter dataConverter,
64-
@Nonnull SlotSupplier<NexusSlotInfo> slotSupplier) {
66+
@Nonnull SlotSupplier<NexusSlotInfo> slotSupplier,
67+
@Nonnull AtomicBoolean serverSupportsAutoscaling) {
6568
this.service = Objects.requireNonNull(service);
6669
this.namespace = Objects.requireNonNull(namespace);
6770
this.taskQueue = Objects.requireNonNull(taskQueue);
@@ -77,6 +80,7 @@ public NexusWorker(
7780
DefaultStubServiceOperationRpcRetryOptions.INSTANCE, null);
7881

7982
this.slotSupplier = new TrackingSlotSupplier<>(slotSupplier, this.workerMetricsScope);
83+
this.serverSupportsAutoscaling = serverSupportsAutoscaling;
8084
// Allow tests to force old format for backward compatibility testing
8185
String forceOldFormat = System.getProperty("temporal.nexus.forceOldFailureFormat");
8286
this.forceOldFailureFormat = "true".equalsIgnoreCase(forceOldFormat);
@@ -112,6 +116,7 @@ public boolean start() {
112116
this.slotSupplier),
113117
this.pollTaskExecutor,
114118
pollerOptions,
119+
serverSupportsAutoscaling.get(),
115120
workerMetricsScope);
116121
} else {
117122
poller =
@@ -282,6 +287,7 @@ public Throwable wrapFailure(NexusTask task, Throwable failure) {
282287
"Failure processing nexus response: " + response.getRequest().toString(), failure);
283288
}
284289

290+
@SuppressWarnings("deprecation") // Uses deprecated operationError
285291
private void handleNexusTask(NexusTask task, Scope metricsScope) {
286292
PollNexusTaskQueueResponseOrBuilder pollResponse = task.getResponse();
287293
ByteString taskToken = pollResponse.getTaskToken();
@@ -374,6 +380,7 @@ private void logExceptionDuringResultReporting(
374380
}
375381
}
376382

383+
@SuppressWarnings("deprecation") // Uses deprecated setOperationError
377384
private Response getResponseForOldServer(Response response) {
378385
Response.Builder b = response.toBuilder();
379386
Failure failure = response.getStartOperation().getFailure();

temporal-sdk/src/main/java/io/temporal/internal/worker/PollScaleReportHandle.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ public class PollScaleReportHandle<T extends ScalingTask> implements Runnable {
1818
private final int maxPollerCount;
1919
private int targetPollerCount;
2020
private final Functions.Proc1<Integer> scaleCallback;
21+
private final boolean serverSupportsAutoscaling;
2122
private boolean everSawScalingDecision;
2223
private int ingestedThisPeriod;
2324
private int ingestedLastPeriod;
@@ -27,18 +28,20 @@ public PollScaleReportHandle(
2728
int minPollerCount,
2829
int maxPollerCount,
2930
int initialPollerCount,
31+
boolean serverSupportsAutoscaling,
3032
Functions.Proc1<Integer> scaleCallback) {
3133
this.minPollerCount = minPollerCount;
3234
this.maxPollerCount = maxPollerCount;
3335
this.targetPollerCount = initialPollerCount;
36+
this.serverSupportsAutoscaling = serverSupportsAutoscaling;
3437
this.scaleCallback = scaleCallback;
3538
}
3639

3740
public synchronized void report(T task, Throwable e) {
3841
if (e != null) {
3942
// We want to avoid scaling down on errors if we have never seen a scaling decision
40-
// since we might never scale up again.
41-
if (!everSawScalingDecision) {
43+
// and the server doesn't support autoscaling - otherwise we might never scale up again.
44+
if (!everSawScalingDecision && !serverSupportsAutoscaling) {
4245
return;
4346
}
4447
if ((e instanceof StatusRuntimeException)) {
@@ -68,9 +71,10 @@ public synchronized void report(T task, Throwable e) {
6871
updateTarget((t -> t + deltaSuggestion));
6972
}
7073

71-
} else if (task == null && everSawScalingDecision) {
74+
} else if (task == null && (everSawScalingDecision || serverSupportsAutoscaling)) {
7275
// We want to avoid scaling down on empty polls if the server has never made any
73-
// scaling decisions - otherwise we might never scale up again.
76+
// scaling decisions and doesn't support autoscaling - otherwise we might never scale
77+
// up again.
7478
updateTarget((t) -> t - 1);
7579
}
7680
}

temporal-sdk/src/main/java/io/temporal/internal/worker/SyncActivityWorker.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import java.util.concurrent.Executors;
1212
import java.util.concurrent.ScheduledExecutorService;
1313
import java.util.concurrent.TimeUnit;
14+
import java.util.concurrent.atomic.AtomicBoolean;
15+
import javax.annotation.Nonnull;
1416
import org.slf4j.Logger;
1517
import org.slf4j.LoggerFactory;
1618

@@ -32,7 +34,8 @@ public SyncActivityWorker(
3234
String taskQueue,
3335
double taskQueueActivitiesPerSecond,
3436
SingleWorkerOptions options,
35-
SlotSupplier<ActivitySlotInfo> slotSupplier) {
37+
SlotSupplier<ActivitySlotInfo> slotSupplier,
38+
@Nonnull AtomicBoolean serverSupportsAutoscaling) {
3639
this.identity = options.getIdentity();
3740
this.namespace = namespace;
3841
this.taskQueue = taskQueue;
@@ -72,7 +75,8 @@ public SyncActivityWorker(
7275
taskQueueActivitiesPerSecond,
7376
options,
7477
taskHandler,
75-
slotSupplier);
78+
slotSupplier,
79+
serverSupportsAutoscaling);
7680
}
7781

7882
public void registerActivityImplementations(Object... activitiesImplementation) {

temporal-sdk/src/main/java/io/temporal/internal/worker/SyncNexusWorker.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import io.temporal.worker.tuning.SlotSupplier;
77
import java.util.concurrent.CompletableFuture;
88
import java.util.concurrent.TimeUnit;
9+
import java.util.concurrent.atomic.AtomicBoolean;
10+
import javax.annotation.Nonnull;
911
import org.slf4j.Logger;
1012
import org.slf4j.LoggerFactory;
1113

@@ -23,7 +25,8 @@ public SyncNexusWorker(
2325
String namespace,
2426
String taskQueue,
2527
SingleWorkerOptions options,
26-
SlotSupplier<NexusSlotInfo> slotSupplier) {
28+
SlotSupplier<NexusSlotInfo> slotSupplier,
29+
@Nonnull AtomicBoolean serverSupportsAutoscaling) {
2730
this.identity = options.getIdentity();
2831
this.namespace = namespace;
2932
this.taskQueue = taskQueue;
@@ -43,7 +46,8 @@ public SyncNexusWorker(
4346
options,
4447
taskHandler,
4548
options.getDataConverter(),
46-
slotSupplier);
49+
slotSupplier,
50+
serverSupportsAutoscaling);
4751
}
4852

4953
@Override

temporal-sdk/src/main/java/io/temporal/internal/worker/SyncWorkflowWorker.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Objects;
2626
import java.util.Optional;
2727
import java.util.concurrent.*;
28+
import java.util.concurrent.atomic.AtomicBoolean;
2829
import javax.annotation.Nonnull;
2930
import javax.annotation.Nullable;
3031
import org.slf4j.Logger;
@@ -68,7 +69,8 @@ public SyncWorkflowWorker(
6869
@Nonnull WorkflowThreadExecutor workflowThreadExecutor,
6970
@Nonnull EagerActivityDispatcher eagerActivityDispatcher,
7071
@Nonnull SlotSupplier<WorkflowSlotInfo> slotSupplier,
71-
@Nonnull SlotSupplier<LocalActivitySlotInfo> laSlotSupplier) {
72+
@Nonnull SlotSupplier<LocalActivitySlotInfo> laSlotSupplier,
73+
@Nonnull AtomicBoolean serverSupportsAutoscaling) {
7274
this.identity = singleWorkerOptions.getIdentity();
7375
this.namespace = namespace;
7476
this.taskQueue = taskQueue;
@@ -122,7 +124,8 @@ public SyncWorkflowWorker(
122124
cache,
123125
taskHandler,
124126
eagerActivityDispatcher,
125-
slotSupplier);
127+
slotSupplier,
128+
serverSupportsAutoscaling);
126129

127130
// Exists to support Worker#replayWorkflowExecution functionality.
128131
// This handler has to be non-sticky to avoid evicting actual executions from the cache

temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.concurrent.CompletableFuture;
3131
import java.util.concurrent.RejectedExecutionException;
3232
import java.util.concurrent.TimeUnit;
33+
import java.util.concurrent.atomic.AtomicBoolean;
3334
import javax.annotation.Nonnull;
3435
import javax.annotation.Nullable;
3536
import org.slf4j.Logger;
@@ -54,6 +55,7 @@ final class WorkflowWorker implements SuspendableWorker {
5455
private final GrpcRetryer grpcRetryer;
5556
private final EagerActivityDispatcher eagerActivityDispatcher;
5657
private final TrackingSlotSupplier<WorkflowSlotInfo> slotSupplier;
58+
private final AtomicBoolean serverSupportsAutoscaling;
5759

5860
private PollTaskExecutor<WorkflowTask> pollTaskExecutor;
5961

@@ -73,7 +75,8 @@ public WorkflowWorker(
7375
@Nonnull WorkflowExecutorCache cache,
7476
@Nonnull WorkflowTaskHandler handler,
7577
@Nonnull EagerActivityDispatcher eagerActivityDispatcher,
76-
@Nonnull SlotSupplier<WorkflowSlotInfo> slotSupplier) {
78+
@Nonnull SlotSupplier<WorkflowSlotInfo> slotSupplier,
79+
@Nonnull AtomicBoolean serverSupportsAutoscaling) {
7780
this.service = Objects.requireNonNull(service);
7881
this.namespace = Objects.requireNonNull(namespace);
7982
this.taskQueue = Objects.requireNonNull(taskQueue);
@@ -88,6 +91,7 @@ public WorkflowWorker(
8891
this.grpcRetryer = new GrpcRetryer(service.getServerCapabilities());
8992
this.eagerActivityDispatcher = eagerActivityDispatcher;
9093
this.slotSupplier = new TrackingSlotSupplier<>(slotSupplier, this.workerMetricsScope);
94+
this.serverSupportsAutoscaling = serverSupportsAutoscaling;
9195
}
9296

9397
@Override
@@ -154,6 +158,7 @@ public boolean start() {
154158
pollers,
155159
this.pollTaskExecutor,
156160
pollerOptions,
161+
serverSupportsAutoscaling.get(),
157162
workerMetricsScope);
158163
} else {
159164
PollerBehaviorSimpleMaximum pollerBehavior =

temporal-sdk/src/main/java/io/temporal/worker/Worker.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ public final class Worker {
6969
boolean useStickyTaskQueue,
7070
WorkflowThreadExecutor workflowThreadExecutor,
7171
List<ContextPropagator> contextPropagators,
72-
@Nonnull List<WorkerPlugin> plugins) {
72+
@Nonnull List<WorkerPlugin> plugins,
73+
@Nonnull AtomicBoolean serverSupportsAutoscaling) {
7374

7475
Objects.requireNonNull(client, "client should not be null");
7576
this.plugins = Objects.requireNonNull(plugins, "plugins should not be null");
@@ -102,7 +103,8 @@ public final class Worker {
102103
taskQueue,
103104
this.options.getMaxTaskQueueActivitiesPerSecond(),
104105
activityOptions,
105-
activitySlotSupplier);
106+
activitySlotSupplier,
107+
serverSupportsAutoscaling);
106108
}
107109

108110
EagerActivityDispatcher eagerActivityDispatcher =
@@ -120,7 +122,13 @@ public final class Worker {
120122
attachMetricsToResourceController(taggedScope, nexusSlotSupplier);
121123

122124
nexusWorker =
123-
new SyncNexusWorker(client, namespace, taskQueue, nexusOptions, nexusSlotSupplier);
125+
new SyncNexusWorker(
126+
client,
127+
namespace,
128+
taskQueue,
129+
nexusOptions,
130+
nexusSlotSupplier,
131+
serverSupportsAutoscaling);
124132

125133
SingleWorkerOptions singleWorkerOptions =
126134
toWorkflowWorkerOptions(
@@ -158,7 +166,8 @@ public final class Worker {
158166
workflowThreadExecutor,
159167
eagerActivityDispatcher,
160168
workflowSlotSupplier,
161-
localActivitySlotSupplier);
169+
localActivitySlotSupplier,
170+
serverSupportsAutoscaling);
162171
}
163172

164173
/**

0 commit comments

Comments
 (0)