Skip to content

Commit bfd3fc6

Browse files
committed
Add worker heartbeat support
Implements periodic worker heartbeat RPCs that report worker status, slot usage, poller info, and task counters to the server. Key components: - HeartbeatManager: per-namespace scheduler that aggregates heartbeats from all workers sharing that namespace - PollerTracker: tracks in-flight poll count and last successful poll time - WorkflowClientOptions.workerHeartbeatInterval: configurable interval (default 60s, range 1-60s, negative to disable) - TrackingSlotSupplier: extended with slot type reporting - Worker: builds SharedNamespaceWorker heartbeat data from activity, workflow, and nexus worker stats - TestWorkflowService: implements recordWorkerHeartbeat, describeWorker, and shutdownWorker RPCs for testing
1 parent c195cd1 commit bfd3fc6

32 files changed

+2485
-79
lines changed

temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.temporal.internal.client.external.ManualActivityCompletionClientFactory;
2424
import io.temporal.internal.common.PluginUtils;
2525
import io.temporal.internal.sync.StubMarker;
26+
import io.temporal.internal.worker.HeartbeatManager;
2627
import io.temporal.serviceclient.MetricsTag;
2728
import io.temporal.serviceclient.WorkflowServiceStubs;
2829
import io.temporal.serviceclient.WorkflowServiceStubsPlugin;
@@ -53,6 +54,8 @@ final class WorkflowClientInternalImpl implements WorkflowClient, WorkflowClient
5354
private final Scope metricsScope;
5455
private final WorkflowClientInterceptor[] interceptors;
5556
private final WorkerFactoryRegistry workerFactoryRegistry = new WorkerFactoryRegistry();
57+
private final String workerGroupingKey = java.util.UUID.randomUUID().toString();
58+
private final @Nullable HeartbeatManager heartbeatManager;
5659

5760
/**
5861
* Creates client that connects to an instance of the Temporal Service. Cannot be used from within
@@ -112,6 +115,14 @@ public static WorkflowClient newInstance(
112115
options.getNamespace(),
113116
options.getIdentity(),
114117
options.getDataConverter());
118+
119+
java.time.Duration heartbeatInterval = options.getWorkerHeartbeatInterval();
120+
if (!heartbeatInterval.isNegative()) {
121+
this.heartbeatManager =
122+
new HeartbeatManager(workflowServiceStubs, options.getIdentity(), heartbeatInterval);
123+
} else {
124+
this.heartbeatManager = null;
125+
}
115126
}
116127

117128
private WorkflowClientCallsInterceptor initializeClientInvoker() {
@@ -790,6 +801,17 @@ public void deregisterWorkerFactory(WorkerFactory workerFactory) {
790801
workerFactoryRegistry.deregister(workerFactory);
791802
}
792803

804+
@Override
805+
public String getWorkerGroupingKey() {
806+
return workerGroupingKey;
807+
}
808+
809+
@Override
810+
@Nullable
811+
public HeartbeatManager getHeartbeatManager() {
812+
return heartbeatManager;
813+
}
814+
793815
@Override
794816
public NexusStartWorkflowResponse startNexus(
795817
NexusStartWorkflowRequest request, Functions.Proc workflow) {

temporal-sdk/src/main/java/io/temporal/client/WorkflowClientOptions.java

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package io.temporal.client;
22

3+
import com.google.common.base.Preconditions;
34
import io.temporal.api.enums.v1.QueryRejectCondition;
45
import io.temporal.common.Experimental;
56
import io.temporal.common.context.ContextPropagator;
67
import io.temporal.common.converter.DataConverter;
78
import io.temporal.common.converter.GlobalDataConverter;
89
import io.temporal.common.interceptors.WorkflowClientInterceptor;
910
import java.lang.management.ManagementFactory;
11+
import java.time.Duration;
1012
import java.util.Arrays;
1113
import java.util.Collections;
1214
import java.util.List;
@@ -49,6 +51,7 @@ public static final class Builder {
4951
private List<ContextPropagator> contextPropagators;
5052
private QueryRejectCondition queryRejectCondition;
5153
private WorkflowClientPlugin[] plugins;
54+
private Duration workerHeartbeatInterval;
5255

5356
private Builder() {}
5457

@@ -64,6 +67,7 @@ private Builder(WorkflowClientOptions options) {
6467
contextPropagators = options.contextPropagators;
6568
queryRejectCondition = options.queryRejectCondition;
6669
plugins = options.plugins;
70+
workerHeartbeatInterval = options.workerHeartbeatInterval;
6771
}
6872

6973
public Builder setNamespace(String namespace) {
@@ -153,6 +157,19 @@ public Builder setPlugins(WorkflowClientPlugin... plugins) {
153157
return this;
154158
}
155159

160+
/**
161+
* Sets the interval at which workers send heartbeat RPCs to the server. If not set or set to
162+
* zero, defaults to 60 seconds. A negative duration disables heartbeating. Positive values must
163+
* be between 1 and 60 seconds inclusive.
164+
*
165+
* @param workerHeartbeatInterval the heartbeat interval, or a negative duration to disable
166+
*/
167+
@Experimental
168+
public Builder setWorkerHeartbeatInterval(Duration workerHeartbeatInterval) {
169+
this.workerHeartbeatInterval = workerHeartbeatInterval;
170+
return this;
171+
}
172+
156173
public WorkflowClientOptions build() {
157174
return new WorkflowClientOptions(
158175
namespace,
@@ -162,7 +179,8 @@ public WorkflowClientOptions build() {
162179
binaryChecksum,
163180
contextPropagators,
164181
queryRejectCondition,
165-
plugins == null ? EMPTY_PLUGINS : plugins);
182+
plugins == null ? EMPTY_PLUGINS : plugins,
183+
resolveHeartbeatInterval(workerHeartbeatInterval));
166184
}
167185

168186
/**
@@ -188,7 +206,22 @@ public WorkflowClientOptions validateAndBuildWithDefaults() {
188206
queryRejectCondition == null
189207
? QueryRejectCondition.QUERY_REJECT_CONDITION_UNSPECIFIED
190208
: queryRejectCondition,
191-
plugins == null ? EMPTY_PLUGINS : plugins);
209+
plugins == null ? EMPTY_PLUGINS : plugins,
210+
resolveHeartbeatInterval(workerHeartbeatInterval));
211+
}
212+
213+
private static Duration resolveHeartbeatInterval(Duration raw) {
214+
if (raw == null || raw.isZero()) {
215+
return Duration.ofSeconds(60);
216+
}
217+
if (raw.isNegative()) {
218+
return raw;
219+
}
220+
Preconditions.checkArgument(
221+
raw.compareTo(Duration.ofSeconds(1)) >= 0 && raw.compareTo(Duration.ofSeconds(60)) <= 0,
222+
"workerHeartbeatInterval must be between 1s and 60s, got %s",
223+
raw);
224+
return raw;
192225
}
193226
}
194227

@@ -215,6 +248,8 @@ public WorkflowClientOptions validateAndBuildWithDefaults() {
215248

216249
private final WorkflowClientPlugin[] plugins;
217250

251+
private final Duration workerHeartbeatInterval;
252+
218253
private WorkflowClientOptions(
219254
String namespace,
220255
DataConverter dataConverter,
@@ -223,7 +258,8 @@ private WorkflowClientOptions(
223258
String binaryChecksum,
224259
List<ContextPropagator> contextPropagators,
225260
QueryRejectCondition queryRejectCondition,
226-
WorkflowClientPlugin[] plugins) {
261+
WorkflowClientPlugin[] plugins,
262+
Duration workerHeartbeatInterval) {
227263
this.namespace = namespace;
228264
this.dataConverter = dataConverter;
229265
this.interceptors = interceptors;
@@ -232,6 +268,7 @@ private WorkflowClientOptions(
232268
this.contextPropagators = contextPropagators;
233269
this.queryRejectCondition = queryRejectCondition;
234270
this.plugins = plugins;
271+
this.workerHeartbeatInterval = workerHeartbeatInterval;
235272
}
236273

237274
/**
@@ -289,6 +326,15 @@ public WorkflowClientPlugin[] getPlugins() {
289326
return plugins;
290327
}
291328

329+
/**
330+
* Returns the worker heartbeat interval. Defaults to 60 seconds if not configured. A negative
331+
* duration means heartbeating is explicitly disabled.
332+
*/
333+
@Experimental
334+
public Duration getWorkerHeartbeatInterval() {
335+
return workerHeartbeatInterval;
336+
}
337+
292338
@Override
293339
public String toString() {
294340
return "WorkflowClientOptions{"
@@ -311,6 +357,8 @@ public String toString() {
311357
+ queryRejectCondition
312358
+ ", plugins="
313359
+ Arrays.toString(plugins)
360+
+ ", workerHeartbeatInterval="
361+
+ workerHeartbeatInterval
314362
+ '}';
315363
}
316364

@@ -326,7 +374,9 @@ public boolean equals(Object o) {
326374
&& com.google.common.base.Objects.equal(binaryChecksum, that.binaryChecksum)
327375
&& com.google.common.base.Objects.equal(contextPropagators, that.contextPropagators)
328376
&& queryRejectCondition == that.queryRejectCondition
329-
&& Arrays.equals(plugins, that.plugins);
377+
&& Arrays.equals(plugins, that.plugins)
378+
&& com.google.common.base.Objects.equal(
379+
workerHeartbeatInterval, that.workerHeartbeatInterval);
330380
}
331381

332382
@Override
@@ -339,6 +389,7 @@ public int hashCode() {
339389
binaryChecksum,
340390
contextPropagators,
341391
queryRejectCondition,
342-
Arrays.hashCode(plugins));
392+
Arrays.hashCode(plugins),
393+
workerHeartbeatInterval);
343394
}
344395
}

temporal-sdk/src/main/java/io/temporal/internal/client/WorkflowClientInternal.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package io.temporal.internal.client;
22

33
import io.temporal.client.WorkflowClient;
4+
import io.temporal.internal.worker.HeartbeatManager;
45
import io.temporal.worker.WorkerFactory;
56
import io.temporal.workflow.Functions;
7+
import javax.annotation.Nullable;
68

79
/**
810
* From OOP point of view, there is no reason for this interface not to extend {@link
@@ -18,4 +20,9 @@ public interface WorkflowClientInternal {
1820
void deregisterWorkerFactory(WorkerFactory workerFactory);
1921

2022
NexusStartWorkflowResponse startNexus(NexusStartWorkflowRequest request, Functions.Proc workflow);
23+
24+
String getWorkerGroupingKey();
25+
26+
@Nullable
27+
HeartbeatManager getHeartbeatManager();
2128
}

temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityPollTask.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import io.temporal.worker.PollerTypeMetricsTag;
1818
import io.temporal.worker.tuning.*;
1919
import java.util.Objects;
20-
import java.util.concurrent.atomic.AtomicInteger;
2120
import java.util.function.Supplier;
2221
import javax.annotation.Nonnull;
2322
import org.slf4j.Logger;
@@ -30,7 +29,7 @@ final class ActivityPollTask implements MultiThreadedPoller.PollTask<ActivityTas
3029
private final TrackingSlotSupplier<ActivitySlotInfo> slotSupplier;
3130
private final Scope metricsScope;
3231
private final PollActivityTaskQueueRequest pollRequest;
33-
private final AtomicInteger pollGauge = new AtomicInteger();
32+
private final PollerTracker pollerTracker;
3433

3534
@SuppressWarnings("deprecation")
3635
public ActivityPollTask(
@@ -42,10 +41,12 @@ public ActivityPollTask(
4241
double activitiesPerSecond,
4342
@Nonnull TrackingSlotSupplier<ActivitySlotInfo> slotSupplier,
4443
@Nonnull Scope metricsScope,
45-
@Nonnull Supplier<GetSystemInfoResponse.Capabilities> serverCapabilities) {
44+
@Nonnull Supplier<GetSystemInfoResponse.Capabilities> serverCapabilities,
45+
@Nonnull PollerTracker pollerTracker) {
4646
this.service = Objects.requireNonNull(service);
4747
this.slotSupplier = slotSupplier;
4848
this.metricsScope = Objects.requireNonNull(metricsScope);
49+
this.pollerTracker = Objects.requireNonNull(pollerTracker);
4950

5051
PollActivityTaskQueueRequest.Builder pollRequest =
5152
PollActivityTaskQueueRequest.newBuilder()
@@ -100,7 +101,7 @@ public ActivityTask poll() {
100101

101102
MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.ACTIVITY_TASK)
102103
.gauge(MetricsType.NUM_POLLERS)
103-
.update(pollGauge.incrementAndGet());
104+
.update(pollerTracker.pollStarted());
104105

105106
try {
106107
response =
@@ -119,14 +120,15 @@ public ActivityTask poll() {
119120
ProtobufTimeUtils.toM3Duration(
120121
response.getStartedTime(), response.getCurrentAttemptScheduledTime()));
121122
isSuccessful = true;
123+
pollerTracker.pollSucceeded();
122124
return new ActivityTask(
123125
response,
124126
permit,
125127
() -> slotSupplier.releaseSlot(SlotReleaseReason.taskComplete(), permit));
126128
} finally {
127129
MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.ACTIVITY_TASK)
128130
.gauge(MetricsType.NUM_POLLERS)
129-
.update(pollGauge.decrementAndGet());
131+
.update(pollerTracker.pollCompleted());
130132

131133
if (!isSuccessful) slotSupplier.releaseSlot(SlotReleaseReason.neverUsed(), permit);
132134
}

temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.CompletableFuture;
2828
import java.util.concurrent.TimeUnit;
2929
import java.util.concurrent.atomic.AtomicBoolean;
30+
import java.util.concurrent.atomic.AtomicInteger;
3031
import javax.annotation.Nonnull;
3132
import org.slf4j.Logger;
3233
import org.slf4j.LoggerFactory;
@@ -49,6 +50,9 @@ final class ActivityWorker implements SuspendableWorker {
4950
private final GrpcRetryer grpcRetryer;
5051
private final GrpcRetryer.GrpcRetryerOptions replyGrpcRetryerOptions;
5152
private final TrackingSlotSupplier<ActivitySlotInfo> slotSupplier;
53+
private final AtomicInteger totalProcessedTasks = new AtomicInteger();
54+
private final AtomicInteger totalFailedTasks = new AtomicInteger();
55+
private final PollerTracker pollerTracker;
5256
private final AtomicBoolean serverSupportsAutoscaling;
5357

5458
public ActivityWorker(
@@ -75,6 +79,7 @@ public ActivityWorker(
7579
DefaultStubServiceOperationRpcRetryOptions.INSTANCE, null);
7680

7781
this.slotSupplier = new TrackingSlotSupplier<>(slotSupplier, this.workerMetricsScope);
82+
this.pollerTracker = new PollerTracker();
7883
this.serverSupportsAutoscaling = serverSupportsAutoscaling;
7984
}
8085

@@ -107,7 +112,8 @@ public boolean start() {
107112
taskQueueActivitiesPerSecond,
108113
this.slotSupplier,
109114
workerMetricsScope,
110-
service.getServerCapabilities()),
115+
service.getServerCapabilities(),
116+
pollerTracker),
111117
this.pollTaskExecutor,
112118
pollerOptions,
113119
serverSupportsAutoscaling.get(),
@@ -126,7 +132,8 @@ public boolean start() {
126132
taskQueueActivitiesPerSecond,
127133
this.slotSupplier,
128134
workerMetricsScope,
129-
service.getServerCapabilities()),
135+
service.getServerCapabilities(),
136+
pollerTracker),
130137
this.pollTaskExecutor,
131138
pollerOptions,
132139
workerMetricsScope);
@@ -216,6 +223,26 @@ private PollerOptions getPollerOptions(SingleWorkerOptions options) {
216223
return pollerOptions;
217224
}
218225

226+
public TrackingSlotSupplier<ActivitySlotInfo> getSlotSupplier() {
227+
return slotSupplier;
228+
}
229+
230+
public AtomicInteger getTotalProcessedTasks() {
231+
return totalProcessedTasks;
232+
}
233+
234+
public AtomicInteger getTotalFailedTasks() {
235+
return totalFailedTasks;
236+
}
237+
238+
public PollerOptions getPollerOptions() {
239+
return pollerOptions;
240+
}
241+
242+
public PollerTracker getPollerTracker() {
243+
return pollerTracker;
244+
}
245+
219246
@Override
220247
public String toString() {
221248
return String.format(
@@ -261,6 +288,15 @@ public void handle(ActivityTask task) throws Exception {
261288
ActivityTaskHandler.Result result = null;
262289
try {
263290
result = handleActivity(task, metricsScope);
291+
totalProcessedTasks.incrementAndGet();
292+
if (result.getTaskFailed() != null
293+
&& !io.temporal.internal.common.FailureUtils.isBenignApplicationFailure(
294+
result.getTaskFailed().getFailure())) {
295+
totalFailedTasks.incrementAndGet();
296+
}
297+
} catch (Exception e) {
298+
totalFailedTasks.incrementAndGet();
299+
throw e;
264300
} finally {
265301
MDC.remove(LoggerTag.ACTIVITY_ID);
266302
MDC.remove(LoggerTag.ACTIVITY_TYPE);

0 commit comments

Comments
 (0)