Skip to content

Commit dedea78

Browse files
committed
Introduce local timeout for activity heartbeats
1 parent ef19423 commit dedea78

3 files changed

Lines changed: 265 additions & 0 deletions

File tree

temporal-sdk/src/main/java/io/temporal/client/ActivityCanceledException.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ public ActivityCanceledException(ActivityInfo info) {
2222
super(info);
2323
}
2424

25+
public ActivityCanceledException(ActivityInfo info, Throwable cause) {
26+
super(info, cause);
27+
}
28+
2529
public ActivityCanceledException() {
2630
super();
2731
}

temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@
66
import io.temporal.activity.ActivityExecutionContext;
77
import io.temporal.activity.ActivityInfo;
88
import io.temporal.api.common.v1.Payloads;
9+
import io.temporal.api.enums.v1.TimeoutType;
910
import io.temporal.api.workflowservice.v1.RecordActivityTaskHeartbeatResponse;
1011
import io.temporal.client.*;
1112
import io.temporal.common.converter.DataConverter;
13+
import io.temporal.failure.TimeoutFailure;
1214
import io.temporal.internal.client.ActivityClientHelper;
1315
import io.temporal.payload.context.ActivitySerializationContext;
1416
import io.temporal.serviceclient.WorkflowServiceStubs;
@@ -28,6 +30,21 @@
2830
class HeartbeatContextImpl implements HeartbeatContext {
2931
private static final Logger log = LoggerFactory.getLogger(HeartbeatContextImpl.class);
3032
private static final long HEARTBEAT_RETRY_WAIT_MILLIS = 1000;
33+
// Buffer added to the heartbeat timeout to avoid racing with the server's own timeout tracking.
34+
// Matches the default in sdk-core.
35+
private static final long DEFAULT_LOCAL_HEARTBEAT_TIMEOUT_BUFFER_MILLIS = 5000;
36+
37+
static long getLocalHeartbeatTimeoutBufferMillis() {
38+
String envVal = System.getenv("TEMPORAL_ACTIVITY_TIMEOUT_DELAY");
39+
if (envVal != null) {
40+
try {
41+
return Long.parseLong(envVal);
42+
} catch (NumberFormatException e) {
43+
log.warn("Invalid TEMPORAL_ACTIVITY_TIMEOUT_DELAY value: {}", envVal);
44+
}
45+
}
46+
return DEFAULT_LOCAL_HEARTBEAT_TIMEOUT_BUFFER_MILLIS;
47+
}
3148

3249
private final Lock lock = new ReentrantLock();
3350

@@ -42,12 +59,15 @@ class HeartbeatContextImpl implements HeartbeatContext {
4259

4360
private final Scope metricsScope;
4461
private final Optional<Payloads> prevAttemptHeartbeatDetails;
62+
private final long heartbeatTimeoutMillis;
63+
private final long localHeartbeatTimeoutBufferMillis;
4564

4665
// turned into true on a reception of the first heartbeat
4766
private boolean receivedAHeartbeat = false;
4867
private Object lastDetails;
4968
private boolean hasOutstandingHeartbeat;
5069
private ScheduledFuture<?> scheduledHeartbeat;
70+
private ScheduledFuture<?> heartbeatTimeoutFuture;
5171

5272
private ActivityCompletionException lastException;
5373

@@ -61,6 +81,30 @@ public HeartbeatContextImpl(
6181
String identity,
6282
Duration maxHeartbeatThrottleInterval,
6383
Duration defaultHeartbeatThrottleInterval) {
84+
this(
85+
service,
86+
namespace,
87+
info,
88+
dataConverter,
89+
heartbeatExecutor,
90+
metricsScope,
91+
identity,
92+
maxHeartbeatThrottleInterval,
93+
defaultHeartbeatThrottleInterval,
94+
getLocalHeartbeatTimeoutBufferMillis());
95+
}
96+
97+
HeartbeatContextImpl(
98+
WorkflowServiceStubs service,
99+
String namespace,
100+
ActivityInfo info,
101+
DataConverter dataConverter,
102+
ScheduledExecutorService heartbeatExecutor,
103+
Scope metricsScope,
104+
String identity,
105+
Duration maxHeartbeatThrottleInterval,
106+
Duration defaultHeartbeatThrottleInterval,
107+
long localHeartbeatTimeoutBufferMillis) {
64108
this.service = service;
65109
this.metricsScope = metricsScope;
66110
this.dataConverter = dataConverter;
@@ -83,6 +127,15 @@ public HeartbeatContextImpl(
83127
info.getHeartbeatTimeout(),
84128
maxHeartbeatThrottleInterval,
85129
defaultHeartbeatThrottleInterval);
130+
this.heartbeatTimeoutMillis = info.getHeartbeatTimeout().toMillis();
131+
this.localHeartbeatTimeoutBufferMillis = localHeartbeatTimeoutBufferMillis;
132+
if (this.heartbeatTimeoutMillis > 0) {
133+
this.heartbeatTimeoutFuture =
134+
heartbeatExecutor.schedule(
135+
this::onHeartbeatTimeout,
136+
heartbeatTimeoutMillis + localHeartbeatTimeoutBufferMillis,
137+
TimeUnit.MILLISECONDS);
138+
}
86139
}
87140

88141
/**
@@ -167,6 +220,10 @@ public void cancelOutstandingHeartbeat() {
167220
scheduledHeartbeat.cancel(false);
168221
scheduledHeartbeat = null;
169222
}
223+
if (heartbeatTimeoutFuture != null) {
224+
heartbeatTimeoutFuture.cancel(false);
225+
heartbeatTimeoutFuture = null;
226+
}
170227
hasOutstandingHeartbeat = false;
171228
} finally {
172229
lock.unlock();
@@ -179,6 +236,9 @@ private void doHeartBeatLocked(Object details) {
179236
sendHeartbeatRequest(details);
180237
hasOutstandingHeartbeat = false;
181238
nextHeartbeatDelay = heartbeatIntervalMillis;
239+
// Reset the local heartbeat timeout timer only on successful send.
240+
// If sends keep failing, the timer will eventually fire and cancel the activity.
241+
resetHeartbeatTimeoutLocked();
182242
} catch (StatusRuntimeException e) {
183243
// Not rethrowing to not fail activity implementation on intermittent connection or Temporal
184244
// errors.
@@ -214,6 +274,34 @@ private void scheduleNextHeartbeatLocked(long delay) {
214274
TimeUnit.MILLISECONDS);
215275
}
216276

277+
private void resetHeartbeatTimeoutLocked() {
278+
if (heartbeatTimeoutFuture != null) {
279+
heartbeatTimeoutFuture.cancel(false);
280+
heartbeatTimeoutFuture =
281+
heartbeatExecutor.schedule(
282+
this::onHeartbeatTimeout,
283+
heartbeatTimeoutMillis + localHeartbeatTimeoutBufferMillis,
284+
TimeUnit.MILLISECONDS);
285+
}
286+
}
287+
288+
private void onHeartbeatTimeout() {
289+
lock.lock();
290+
try {
291+
if (lastException == null) {
292+
log.warn(
293+
"Activity heartbeat timed out locally. ActivityId={}, activityType={}",
294+
info.getActivityId(),
295+
info.getActivityType());
296+
lastException =
297+
new ActivityCanceledException(
298+
info, new TimeoutFailure(null, null, TimeoutType.TIMEOUT_TYPE_HEARTBEAT));
299+
}
300+
} finally {
301+
lock.unlock();
302+
}
303+
}
304+
217305
private void sendHeartbeatRequest(Object details) {
218306
try {
219307
RecordActivityTaskHeartbeatResponse status =
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
package io.temporal.internal.activity;
2+
3+
import static org.junit.Assert.*;
4+
import static org.mockito.ArgumentMatchers.any;
5+
import static org.mockito.Mockito.*;
6+
7+
import com.uber.m3.tally.NoopScope;
8+
import io.grpc.Status;
9+
import io.grpc.StatusRuntimeException;
10+
import io.temporal.activity.ActivityInfo;
11+
import io.temporal.api.enums.v1.TimeoutType;
12+
import io.temporal.api.workflowservice.v1.RecordActivityTaskHeartbeatResponse;
13+
import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc;
14+
import io.temporal.client.ActivityCanceledException;
15+
import io.temporal.client.ActivityCompletionException;
16+
import io.temporal.common.converter.GlobalDataConverter;
17+
import io.temporal.failure.TimeoutFailure;
18+
import io.temporal.serviceclient.WorkflowServiceStubs;
19+
import io.temporal.testUtils.Eventually;
20+
import java.time.Duration;
21+
import java.util.Optional;
22+
import java.util.concurrent.Executors;
23+
import java.util.concurrent.ScheduledExecutorService;
24+
import java.util.concurrent.atomic.AtomicInteger;
25+
import org.junit.After;
26+
import org.junit.Before;
27+
import org.junit.Test;
28+
29+
public class HeartbeatContextImplTest {
30+
31+
private static final long TEST_BUFFER_MILLIS = 200;
32+
33+
private ScheduledExecutorService heartbeatExecutor;
34+
private WorkflowServiceStubs service;
35+
private WorkflowServiceGrpc.WorkflowServiceBlockingStub blockingStub;
36+
37+
@Before
38+
public void setUp() {
39+
heartbeatExecutor = Executors.newScheduledThreadPool(1);
40+
service = mock(WorkflowServiceStubs.class);
41+
blockingStub = mock(WorkflowServiceGrpc.WorkflowServiceBlockingStub.class);
42+
when(service.blockingStub()).thenReturn(blockingStub);
43+
when(blockingStub.withOption(any(), any())).thenReturn(blockingStub);
44+
}
45+
46+
@After
47+
public void tearDown() {
48+
heartbeatExecutor.shutdownNow();
49+
}
50+
51+
@Test
52+
public void heartbeatTimeoutLocallyCancelsActivity() {
53+
Duration heartbeatTimeout = Duration.ofMillis(500);
54+
55+
// All heartbeat RPCs fail with UNAVAILABLE
56+
when(blockingStub.recordActivityTaskHeartbeat(any()))
57+
.thenThrow(new StatusRuntimeException(Status.UNAVAILABLE));
58+
59+
ActivityInfo info = activityInfoWithHeartbeatTimeout(heartbeatTimeout);
60+
HeartbeatContextImpl ctx = createHeartbeatContext(info);
61+
62+
long startNanos = System.nanoTime();
63+
ctx.heartbeat("details-1");
64+
65+
ActivityCompletionException caught =
66+
Eventually.assertEventually(
67+
Duration.ofSeconds(10),
68+
() -> {
69+
try {
70+
ctx.heartbeat("poll");
71+
fail("Expected ActivityCanceledException");
72+
return null;
73+
} catch (ActivityCompletionException e) {
74+
return e;
75+
}
76+
});
77+
78+
long elapsedMs = Duration.ofNanos(System.nanoTime() - startNanos).toMillis();
79+
80+
assertSame(ActivityCanceledException.class, caught.getClass());
81+
assertNotNull("Expected a TimeoutFailure cause", caught.getCause());
82+
assertSame(TimeoutFailure.class, caught.getCause().getClass());
83+
assertEquals(
84+
TimeoutType.TIMEOUT_TYPE_HEARTBEAT, ((TimeoutFailure) caught.getCause()).getTimeoutType());
85+
long expectedMinMs = heartbeatTimeout.toMillis() + TEST_BUFFER_MILLIS;
86+
assertTrue(
87+
"Timeout should not fire before heartbeat timeout + buffer ("
88+
+ elapsedMs
89+
+ "ms elapsed, expected >= "
90+
+ expectedMinMs
91+
+ "ms)",
92+
elapsedMs >= expectedMinMs);
93+
94+
ctx.cancelOutstandingHeartbeat();
95+
}
96+
97+
@Test
98+
public void heartbeatTimeoutResetsOnSuccessfulSend() {
99+
Duration heartbeatTimeout = Duration.ofMillis(500);
100+
AtomicInteger callCount = new AtomicInteger();
101+
102+
// First call succeeds, then all subsequent calls fail
103+
when(blockingStub.recordActivityTaskHeartbeat(any()))
104+
.thenAnswer(
105+
invocation -> {
106+
if (callCount.getAndIncrement() == 0) {
107+
return RecordActivityTaskHeartbeatResponse.getDefaultInstance();
108+
}
109+
throw new StatusRuntimeException(Status.UNAVAILABLE);
110+
});
111+
112+
ActivityInfo info = activityInfoWithHeartbeatTimeout(heartbeatTimeout);
113+
HeartbeatContextImpl ctx = createHeartbeatContext(info);
114+
115+
// The first heartbeat() call sends the RPC synchronously (no scheduled heartbeat yet).
116+
// Record the time before calling — the timer reset happens during this call.
117+
long resetNanos = System.nanoTime();
118+
ctx.heartbeat("details-1");
119+
assertEquals("First RPC should have been the successful one", 1, callCount.get());
120+
121+
// Poll until the timeout fires again (from the reset point)
122+
Eventually.assertEventually(
123+
Duration.ofSeconds(10),
124+
() -> {
125+
try {
126+
ctx.heartbeat("poll");
127+
fail("Expected ActivityCanceledException");
128+
} catch (ActivityCanceledException e) {
129+
// expected
130+
}
131+
});
132+
133+
long elapsedSinceResetMs = Duration.ofNanos(System.nanoTime() - resetNanos).toMillis();
134+
long expectedMinMs = heartbeatTimeout.toMillis() + TEST_BUFFER_MILLIS;
135+
assertTrue(
136+
"Timeout should not fire before heartbeat timeout + buffer from reset point ("
137+
+ elapsedSinceResetMs
138+
+ "ms elapsed since reset, expected >= "
139+
+ expectedMinMs
140+
+ "ms)",
141+
elapsedSinceResetMs >= expectedMinMs);
142+
143+
ctx.cancelOutstandingHeartbeat();
144+
}
145+
146+
private HeartbeatContextImpl createHeartbeatContext(ActivityInfo info) {
147+
return new HeartbeatContextImpl(
148+
service,
149+
"test-namespace",
150+
info,
151+
GlobalDataConverter.get(),
152+
heartbeatExecutor,
153+
new NoopScope(),
154+
"test-identity",
155+
Duration.ofSeconds(60),
156+
Duration.ofSeconds(30),
157+
TEST_BUFFER_MILLIS);
158+
}
159+
160+
private static ActivityInfo activityInfoWithHeartbeatTimeout(Duration heartbeatTimeout) {
161+
ActivityInfo info = mock(ActivityInfo.class);
162+
when(info.getHeartbeatTimeout()).thenReturn(heartbeatTimeout);
163+
when(info.getTaskToken()).thenReturn(new byte[] {1, 2, 3});
164+
when(info.getWorkflowId()).thenReturn("test-workflow-id");
165+
when(info.getWorkflowType()).thenReturn("test-workflow-type");
166+
when(info.getActivityType()).thenReturn("test-activity-type");
167+
when(info.getActivityTaskQueue()).thenReturn("test-task-queue");
168+
when(info.getActivityId()).thenReturn("test-activity-id");
169+
when(info.isLocal()).thenReturn(false);
170+
when(info.getHeartbeatDetails()).thenReturn(Optional.empty());
171+
return info;
172+
}
173+
}

0 commit comments

Comments
 (0)