Skip to content

Commit 71fec71

Browse files
yuandrewdonald-pinckney
authored andcommitted
Worker Heartbeating (#2818)
* Add worker heartbeat support Implements periodic worker heartbeat RPCs that report worker status, slot usage, poller info, and task counters to the server. Key components: - HeartbeatManager: per-namespace scheduler that aggregates heartbeats from all workers sharing that namespace - PollerTracker: tracks in-flight poll count and last successful poll time - WorkflowClientOptions.workerHeartbeatInterval: configurable interval (default 60s, range 1-60s, negative to disable) - TrackingSlotSupplier: extended with slot type reporting - Worker: builds SharedNamespaceWorker heartbeat data from activity, workflow, and nexus worker stats - TestWorkflowService: implements recordWorkerHeartbeat, describeWorker, and shutdownWorker RPCs for testing
1 parent 78c7fd7 commit 71fec71

37 files changed

+2878
-240
lines changed

.github/workflows/ci.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ jobs:
108108
--dynamic-config-value history.MaxBufferedQueryCount=10000 \
109109
--dynamic-config-value frontend.workerVersioningDataAPIs=true \
110110
--dynamic-config-value history.enableRequestIdRefLinks=true \
111+
--dynamic-config-value frontend.WorkerHeartbeatsEnabled=true \
112+
--dynamic-config-value frontend.ListWorkersEnabled=true \
111113
--dynamic-config-value 'component.callbacks.allowedAddresses=[{"Pattern":"localhost:7243","AllowInsecure":true}]' &
112114
sleep 10s
113115

temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.temporal.internal.client.external.ManualActivityCompletionClientFactory;
2424
import io.temporal.internal.common.PluginUtils;
2525
import io.temporal.internal.sync.StubMarker;
26+
import io.temporal.internal.worker.HeartbeatManager;
2627
import io.temporal.serviceclient.MetricsTag;
2728
import io.temporal.serviceclient.WorkflowServiceStubs;
2829
import io.temporal.serviceclient.WorkflowServiceStubsPlugin;
@@ -53,6 +54,8 @@ final class WorkflowClientInternalImpl implements WorkflowClient, WorkflowClient
5354
private final Scope metricsScope;
5455
private final WorkflowClientInterceptor[] interceptors;
5556
private final WorkerFactoryRegistry workerFactoryRegistry = new WorkerFactoryRegistry();
57+
private final String workerGroupingKey = java.util.UUID.randomUUID().toString();
58+
private final @Nullable HeartbeatManager heartbeatManager;
5659

5760
/**
5861
* Creates client that connects to an instance of the Temporal Service. Cannot be used from within
@@ -112,6 +115,14 @@ public static WorkflowClient newInstance(
112115
options.getNamespace(),
113116
options.getIdentity(),
114117
options.getDataConverter());
118+
119+
java.time.Duration heartbeatInterval = options.getWorkerHeartbeatInterval();
120+
if (!heartbeatInterval.isNegative()) {
121+
this.heartbeatManager =
122+
new HeartbeatManager(workflowServiceStubs, options.getIdentity(), heartbeatInterval);
123+
} else {
124+
this.heartbeatManager = null;
125+
}
115126
}
116127

117128
private WorkflowClientCallsInterceptor initializeClientInvoker() {
@@ -788,6 +799,20 @@ public void registerWorkerFactory(WorkerFactory workerFactory) {
788799
@Override
789800
public void deregisterWorkerFactory(WorkerFactory workerFactory) {
790801
workerFactoryRegistry.deregister(workerFactory);
802+
if (workerFactoryRegistry.isEmpty() && heartbeatManager != null) {
803+
heartbeatManager.shutdown();
804+
}
805+
}
806+
807+
@Override
808+
public String getWorkerGroupingKey() {
809+
return workerGroupingKey;
810+
}
811+
812+
@Override
813+
@Nullable
814+
public HeartbeatManager getHeartbeatManager() {
815+
return heartbeatManager;
791816
}
792817

793818
@Override

temporal-sdk/src/main/java/io/temporal/client/WorkflowClientOptions.java

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package io.temporal.client;
22

3+
import com.google.common.base.Preconditions;
34
import io.temporal.api.enums.v1.QueryRejectCondition;
45
import io.temporal.common.Experimental;
56
import io.temporal.common.context.ContextPropagator;
67
import io.temporal.common.converter.DataConverter;
78
import io.temporal.common.converter.GlobalDataConverter;
89
import io.temporal.common.interceptors.WorkflowClientInterceptor;
910
import java.lang.management.ManagementFactory;
11+
import java.time.Duration;
1012
import java.util.Arrays;
1113
import java.util.Collections;
1214
import java.util.List;
@@ -49,6 +51,7 @@ public static final class Builder {
4951
private List<ContextPropagator> contextPropagators;
5052
private QueryRejectCondition queryRejectCondition;
5153
private WorkflowClientPlugin[] plugins;
54+
private Duration workerHeartbeatInterval;
5255

5356
private Builder() {}
5457

@@ -64,6 +67,7 @@ private Builder(WorkflowClientOptions options) {
6467
contextPropagators = options.contextPropagators;
6568
queryRejectCondition = options.queryRejectCondition;
6669
plugins = options.plugins;
70+
workerHeartbeatInterval = options.workerHeartbeatInterval;
6771
}
6872

6973
public Builder setNamespace(String namespace) {
@@ -153,6 +157,19 @@ public Builder setPlugins(WorkflowClientPlugin... plugins) {
153157
return this;
154158
}
155159

160+
/**
161+
* Sets the interval at which workers send heartbeat RPCs to the server. If not set or set to
162+
* zero, defaults to 60 seconds. A negative duration disables heartbeating. Positive values must
163+
* be between 1 and 60 seconds inclusive.
164+
*
165+
* @param workerHeartbeatInterval the heartbeat interval, or a negative duration to disable
166+
*/
167+
@Experimental
168+
public Builder setWorkerHeartbeatInterval(Duration workerHeartbeatInterval) {
169+
this.workerHeartbeatInterval = workerHeartbeatInterval;
170+
return this;
171+
}
172+
156173
public WorkflowClientOptions build() {
157174
return new WorkflowClientOptions(
158175
namespace,
@@ -162,7 +179,8 @@ public WorkflowClientOptions build() {
162179
binaryChecksum,
163180
contextPropagators,
164181
queryRejectCondition,
165-
plugins == null ? EMPTY_PLUGINS : plugins);
182+
plugins == null ? EMPTY_PLUGINS : plugins,
183+
resolveHeartbeatInterval(workerHeartbeatInterval));
166184
}
167185

168186
/**
@@ -188,7 +206,22 @@ public WorkflowClientOptions validateAndBuildWithDefaults() {
188206
queryRejectCondition == null
189207
? QueryRejectCondition.QUERY_REJECT_CONDITION_UNSPECIFIED
190208
: queryRejectCondition,
191-
plugins == null ? EMPTY_PLUGINS : plugins);
209+
plugins == null ? EMPTY_PLUGINS : plugins,
210+
resolveHeartbeatInterval(workerHeartbeatInterval));
211+
}
212+
213+
private static Duration resolveHeartbeatInterval(Duration raw) {
214+
if (raw == null || raw.isZero()) {
215+
return Duration.ofSeconds(60);
216+
}
217+
if (raw.isNegative()) {
218+
return raw;
219+
}
220+
Preconditions.checkArgument(
221+
raw.compareTo(Duration.ofSeconds(1)) >= 0 && raw.compareTo(Duration.ofSeconds(60)) <= 0,
222+
"workerHeartbeatInterval must be between 1s and 60s, got %s",
223+
raw);
224+
return raw;
192225
}
193226
}
194227

@@ -215,6 +248,8 @@ public WorkflowClientOptions validateAndBuildWithDefaults() {
215248

216249
private final WorkflowClientPlugin[] plugins;
217250

251+
private final Duration workerHeartbeatInterval;
252+
218253
private WorkflowClientOptions(
219254
String namespace,
220255
DataConverter dataConverter,
@@ -223,7 +258,8 @@ private WorkflowClientOptions(
223258
String binaryChecksum,
224259
List<ContextPropagator> contextPropagators,
225260
QueryRejectCondition queryRejectCondition,
226-
WorkflowClientPlugin[] plugins) {
261+
WorkflowClientPlugin[] plugins,
262+
Duration workerHeartbeatInterval) {
227263
this.namespace = namespace;
228264
this.dataConverter = dataConverter;
229265
this.interceptors = interceptors;
@@ -232,6 +268,7 @@ private WorkflowClientOptions(
232268
this.contextPropagators = contextPropagators;
233269
this.queryRejectCondition = queryRejectCondition;
234270
this.plugins = plugins;
271+
this.workerHeartbeatInterval = workerHeartbeatInterval;
235272
}
236273

237274
/**
@@ -289,6 +326,15 @@ public WorkflowClientPlugin[] getPlugins() {
289326
return plugins;
290327
}
291328

329+
/**
330+
* Returns the worker heartbeat interval. Defaults to 60 seconds if not configured. A negative
331+
* duration means heartbeating is explicitly disabled.
332+
*/
333+
@Experimental
334+
public Duration getWorkerHeartbeatInterval() {
335+
return workerHeartbeatInterval;
336+
}
337+
292338
@Override
293339
public String toString() {
294340
return "WorkflowClientOptions{"
@@ -311,6 +357,8 @@ public String toString() {
311357
+ queryRejectCondition
312358
+ ", plugins="
313359
+ Arrays.toString(plugins)
360+
+ ", workerHeartbeatInterval="
361+
+ workerHeartbeatInterval
314362
+ '}';
315363
}
316364

@@ -326,7 +374,9 @@ public boolean equals(Object o) {
326374
&& com.google.common.base.Objects.equal(binaryChecksum, that.binaryChecksum)
327375
&& com.google.common.base.Objects.equal(contextPropagators, that.contextPropagators)
328376
&& queryRejectCondition == that.queryRejectCondition
329-
&& Arrays.equals(plugins, that.plugins);
377+
&& Arrays.equals(plugins, that.plugins)
378+
&& com.google.common.base.Objects.equal(
379+
workerHeartbeatInterval, that.workerHeartbeatInterval);
330380
}
331381

332382
@Override
@@ -339,6 +389,7 @@ public int hashCode() {
339389
binaryChecksum,
340390
contextPropagators,
341391
queryRejectCondition,
342-
Arrays.hashCode(plugins));
392+
Arrays.hashCode(plugins),
393+
workerHeartbeatInterval);
343394
}
344395
}

temporal-sdk/src/main/java/io/temporal/internal/client/WorkerFactoryRegistry.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,8 @@ public void register(WorkerFactory workerFactory) {
2626
public void deregister(WorkerFactory workerFactory) {
2727
workerFactories.remove(workerFactory);
2828
}
29+
30+
public boolean isEmpty() {
31+
return workerFactories.isEmpty();
32+
}
2933
}

temporal-sdk/src/main/java/io/temporal/internal/client/WorkflowClientInternal.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package io.temporal.internal.client;
22

33
import io.temporal.client.WorkflowClient;
4+
import io.temporal.internal.worker.HeartbeatManager;
45
import io.temporal.worker.WorkerFactory;
56
import io.temporal.workflow.Functions;
7+
import javax.annotation.Nullable;
68

79
/**
810
* From OOP point of view, there is no reason for this interface not to extend {@link
@@ -18,4 +20,9 @@ public interface WorkflowClientInternal {
1820
void deregisterWorkerFactory(WorkerFactory workerFactory);
1921

2022
NexusStartWorkflowResponse startNexus(NexusStartWorkflowRequest request, Functions.Proc workflow);
23+
24+
String getWorkerGroupingKey();
25+
26+
@Nullable
27+
HeartbeatManager getHeartbeatManager();
2128
}

temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ public NexusTaskHandlerImpl(
6464
this.nexusServiceInterceptor = new TemporalInterceptorMiddleware(interceptors);
6565
}
6666

67+
public boolean isAnyTypeSupported() {
68+
return !serviceImplInstances.isEmpty();
69+
}
70+
6771
@Override
6872
public boolean start() {
6973
if (serviceImplInstances.isEmpty()) {

temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityPollTask.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import io.temporal.worker.PollerTypeMetricsTag;
1818
import io.temporal.worker.tuning.*;
1919
import java.util.Objects;
20-
import java.util.concurrent.atomic.AtomicInteger;
2120
import java.util.function.Supplier;
2221
import javax.annotation.Nonnull;
2322
import org.slf4j.Logger;
@@ -30,7 +29,7 @@ final class ActivityPollTask implements MultiThreadedPoller.PollTask<ActivityTas
3029
private final TrackingSlotSupplier<ActivitySlotInfo> slotSupplier;
3130
private final Scope metricsScope;
3231
private final PollActivityTaskQueueRequest pollRequest;
33-
private final AtomicInteger pollGauge = new AtomicInteger();
32+
private final PollerTracker pollerTracker;
3433

3534
@SuppressWarnings("deprecation")
3635
public ActivityPollTask(
@@ -42,10 +41,12 @@ public ActivityPollTask(
4241
double activitiesPerSecond,
4342
@Nonnull TrackingSlotSupplier<ActivitySlotInfo> slotSupplier,
4443
@Nonnull Scope metricsScope,
45-
@Nonnull Supplier<GetSystemInfoResponse.Capabilities> serverCapabilities) {
44+
@Nonnull Supplier<GetSystemInfoResponse.Capabilities> serverCapabilities,
45+
@Nonnull PollerTracker pollerTracker) {
4646
this.service = Objects.requireNonNull(service);
4747
this.slotSupplier = slotSupplier;
4848
this.metricsScope = Objects.requireNonNull(metricsScope);
49+
this.pollerTracker = Objects.requireNonNull(pollerTracker);
4950

5051
PollActivityTaskQueueRequest.Builder pollRequest =
5152
PollActivityTaskQueueRequest.newBuilder()
@@ -100,7 +101,7 @@ public ActivityTask poll() {
100101

101102
MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.ACTIVITY_TASK)
102103
.gauge(MetricsType.NUM_POLLERS)
103-
.update(pollGauge.incrementAndGet());
104+
.update(pollerTracker.pollStarted());
104105

105106
try {
106107
response =
@@ -119,14 +120,15 @@ public ActivityTask poll() {
119120
ProtobufTimeUtils.toM3Duration(
120121
response.getStartedTime(), response.getCurrentAttemptScheduledTime()));
121122
isSuccessful = true;
123+
pollerTracker.pollSucceeded();
122124
return new ActivityTask(
123125
response,
124126
permit,
125127
() -> slotSupplier.releaseSlot(SlotReleaseReason.taskComplete(), permit));
126128
} finally {
127129
MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.ACTIVITY_TASK)
128130
.gauge(MetricsType.NUM_POLLERS)
129-
.update(pollGauge.decrementAndGet());
131+
.update(pollerTracker.pollCompleted());
130132

131133
if (!isSuccessful) slotSupplier.releaseSlot(SlotReleaseReason.neverUsed(), permit);
132134
}

0 commit comments

Comments
 (0)