Skip to content

Commit 53449bb

Browse files
authored
Introduce local timeout for activity heartbeats (#2804)
* Introduce local timeout for activity heartbeats * Use Java property instead of env var * Get rid of unnecessary future
1 parent 6ba0947 commit 53449bb

File tree

3 files changed

+299
-0
lines changed

3 files changed

+299
-0
lines changed

temporal-sdk/src/main/java/io/temporal/client/ActivityCanceledException.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ public ActivityCanceledException(ActivityInfo info) {
2222
super(info);
2323
}
2424

25+
public ActivityCanceledException(ActivityInfo info, Throwable cause) {
26+
super(info, cause);
27+
}
28+
2529
public ActivityCanceledException() {
2630
super();
2731
}

temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@
66
import io.temporal.activity.ActivityExecutionContext;
77
import io.temporal.activity.ActivityInfo;
88
import io.temporal.api.common.v1.Payloads;
9+
import io.temporal.api.enums.v1.TimeoutType;
910
import io.temporal.api.workflowservice.v1.RecordActivityTaskHeartbeatResponse;
1011
import io.temporal.client.*;
1112
import io.temporal.common.converter.DataConverter;
13+
import io.temporal.failure.TimeoutFailure;
1214
import io.temporal.internal.client.ActivityClientHelper;
1315
import io.temporal.payload.context.ActivitySerializationContext;
1416
import io.temporal.serviceclient.WorkflowServiceStubs;
@@ -28,6 +30,21 @@
2830
class HeartbeatContextImpl implements HeartbeatContext {
2931
private static final Logger log = LoggerFactory.getLogger(HeartbeatContextImpl.class);
3032
private static final long HEARTBEAT_RETRY_WAIT_MILLIS = 1000;
33+
// Buffer added to the heartbeat timeout to avoid racing with the server's own timeout tracking.
34+
private static final long DEFAULT_LOCAL_HEARTBEAT_TIMEOUT_BUFFER_MILLIS = 5000;
35+
static final String LOCAL_TIMEOUT_BUFFER_PROPERTY = "temporal.activity.localTimeoutBufferMs";
36+
37+
static long getLocalHeartbeatTimeoutBufferMillis() {
38+
String val = System.getProperty(LOCAL_TIMEOUT_BUFFER_PROPERTY);
39+
if (val != null) {
40+
try {
41+
return Long.parseLong(val);
42+
} catch (NumberFormatException e) {
43+
log.warn("Invalid {} value: {}", LOCAL_TIMEOUT_BUFFER_PROPERTY, val);
44+
}
45+
}
46+
return DEFAULT_LOCAL_HEARTBEAT_TIMEOUT_BUFFER_MILLIS;
47+
}
3148

3249
private final Lock lock = new ReentrantLock();
3350

@@ -42,13 +59,20 @@ class HeartbeatContextImpl implements HeartbeatContext {
4259

4360
private final Scope metricsScope;
4461
private final Optional<Payloads> prevAttemptHeartbeatDetails;
62+
private final long heartbeatTimeoutMillis;
63+
private final long localHeartbeatTimeoutBufferMillis;
4564

4665
// turned into true on a reception of the first heartbeat
4766
private boolean receivedAHeartbeat = false;
4867
private Object lastDetails;
4968
private boolean hasOutstandingHeartbeat;
5069
private ScheduledFuture<?> scheduledHeartbeat;
5170

71+
// Deadline (in nanos, from System.nanoTime()) by which a successful heartbeat must occur.
72+
// 0 means no local timeout is active.
73+
private long heartbeatTimeoutDeadlineNanos;
74+
private boolean heartbeatTimedOut;
75+
5276
private ActivityCompletionException lastException;
5377

5478
public HeartbeatContextImpl(
@@ -61,6 +85,30 @@ public HeartbeatContextImpl(
6185
String identity,
6286
Duration maxHeartbeatThrottleInterval,
6387
Duration defaultHeartbeatThrottleInterval) {
88+
this(
89+
service,
90+
namespace,
91+
info,
92+
dataConverter,
93+
heartbeatExecutor,
94+
metricsScope,
95+
identity,
96+
maxHeartbeatThrottleInterval,
97+
defaultHeartbeatThrottleInterval,
98+
getLocalHeartbeatTimeoutBufferMillis());
99+
}
100+
101+
HeartbeatContextImpl(
102+
WorkflowServiceStubs service,
103+
String namespace,
104+
ActivityInfo info,
105+
DataConverter dataConverter,
106+
ScheduledExecutorService heartbeatExecutor,
107+
Scope metricsScope,
108+
String identity,
109+
Duration maxHeartbeatThrottleInterval,
110+
Duration defaultHeartbeatThrottleInterval,
111+
long localHeartbeatTimeoutBufferMillis) {
64112
this.service = service;
65113
this.metricsScope = metricsScope;
66114
this.dataConverter = dataConverter;
@@ -83,6 +131,11 @@ public HeartbeatContextImpl(
83131
info.getHeartbeatTimeout(),
84132
maxHeartbeatThrottleInterval,
85133
defaultHeartbeatThrottleInterval);
134+
this.heartbeatTimeoutMillis = info.getHeartbeatTimeout().toMillis();
135+
this.localHeartbeatTimeoutBufferMillis = localHeartbeatTimeoutBufferMillis;
136+
if (this.heartbeatTimeoutMillis > 0) {
137+
this.heartbeatTimeoutDeadlineNanos = computeHeartbeatTimeoutDeadlineNanos();
138+
}
86139
}
87140

88141
/**
@@ -95,6 +148,7 @@ public <V> void heartbeat(V details) throws ActivityCompletionException {
95148
}
96149
lock.lock();
97150
try {
151+
checkHeartbeatTimeoutDeadlineLocked();
98152
receivedAHeartbeat = true;
99153
lastDetails = details;
100154
hasOutstandingHeartbeat = true;
@@ -167,6 +221,7 @@ public void cancelOutstandingHeartbeat() {
167221
scheduledHeartbeat.cancel(false);
168222
scheduledHeartbeat = null;
169223
}
224+
heartbeatTimeoutDeadlineNanos = 0;
170225
hasOutstandingHeartbeat = false;
171226
} finally {
172227
lock.unlock();
@@ -179,6 +234,12 @@ private void doHeartBeatLocked(Object details) {
179234
sendHeartbeatRequest(details);
180235
hasOutstandingHeartbeat = false;
181236
nextHeartbeatDelay = heartbeatIntervalMillis;
237+
// Reset the local heartbeat timeout deadline only on successful send.
238+
// If sends keep failing, the next heartbeat() call after the deadline will cancel the
239+
// activity.
240+
if (heartbeatTimeoutDeadlineNanos != 0) {
241+
heartbeatTimeoutDeadlineNanos = computeHeartbeatTimeoutDeadlineNanos();
242+
}
182243
} catch (StatusRuntimeException e) {
183244
// Not rethrowing to not fail activity implementation on intermittent connection or Temporal
184245
// errors.
@@ -214,6 +275,27 @@ private void scheduleNextHeartbeatLocked(long delay) {
214275
TimeUnit.MILLISECONDS);
215276
}
216277

278+
private long computeHeartbeatTimeoutDeadlineNanos() {
279+
return System.nanoTime()
280+
+ TimeUnit.MILLISECONDS.toNanos(heartbeatTimeoutMillis + localHeartbeatTimeoutBufferMillis);
281+
}
282+
283+
private void checkHeartbeatTimeoutDeadlineLocked() {
284+
if (heartbeatTimedOut) {
285+
throw new ActivityCanceledException(
286+
info, new TimeoutFailure(null, null, TimeoutType.TIMEOUT_TYPE_HEARTBEAT));
287+
}
288+
if (heartbeatTimeoutDeadlineNanos != 0 && System.nanoTime() >= heartbeatTimeoutDeadlineNanos) {
289+
heartbeatTimedOut = true;
290+
log.warn(
291+
"Activity heartbeat timed out locally. ActivityId={}, activityType={}",
292+
info.getActivityId(),
293+
info.getActivityType());
294+
throw new ActivityCanceledException(
295+
info, new TimeoutFailure(null, null, TimeoutType.TIMEOUT_TYPE_HEARTBEAT));
296+
}
297+
}
298+
217299
private void sendHeartbeatRequest(Object details) {
218300
try {
219301
RecordActivityTaskHeartbeatResponse status =
Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
package io.temporal.internal.activity;
2+
3+
import static org.junit.Assert.*;
4+
import static org.mockito.ArgumentMatchers.any;
5+
import static org.mockito.Mockito.*;
6+
7+
import com.uber.m3.tally.NoopScope;
8+
import io.grpc.Status;
9+
import io.grpc.StatusRuntimeException;
10+
import io.temporal.activity.ActivityInfo;
11+
import io.temporal.api.enums.v1.TimeoutType;
12+
import io.temporal.api.workflowservice.v1.RecordActivityTaskHeartbeatResponse;
13+
import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc;
14+
import io.temporal.client.ActivityCanceledException;
15+
import io.temporal.client.ActivityCompletionException;
16+
import io.temporal.common.converter.GlobalDataConverter;
17+
import io.temporal.failure.TimeoutFailure;
18+
import io.temporal.serviceclient.WorkflowServiceStubs;
19+
import io.temporal.testUtils.Eventually;
20+
import java.time.Duration;
21+
import java.util.Optional;
22+
import java.util.concurrent.Executors;
23+
import java.util.concurrent.ScheduledExecutorService;
24+
import java.util.concurrent.atomic.AtomicInteger;
25+
import org.junit.After;
26+
import org.junit.Before;
27+
import org.junit.Test;
28+
29+
public class HeartbeatContextImplTest {
30+
31+
private static final long TEST_BUFFER_MILLIS = 200;
32+
33+
private ScheduledExecutorService heartbeatExecutor;
34+
private WorkflowServiceStubs service;
35+
private WorkflowServiceGrpc.WorkflowServiceBlockingStub blockingStub;
36+
37+
@Before
38+
public void setUp() {
39+
heartbeatExecutor = Executors.newScheduledThreadPool(1);
40+
service = mock(WorkflowServiceStubs.class);
41+
blockingStub = mock(WorkflowServiceGrpc.WorkflowServiceBlockingStub.class);
42+
when(service.blockingStub()).thenReturn(blockingStub);
43+
when(blockingStub.withOption(any(), any())).thenReturn(blockingStub);
44+
}
45+
46+
@After
47+
public void tearDown() {
48+
heartbeatExecutor.shutdownNow();
49+
}
50+
51+
@Test
52+
public void heartbeatTimeoutLocallyCancelsActivity() {
53+
Duration heartbeatTimeout = Duration.ofMillis(500);
54+
55+
// All heartbeat RPCs fail with UNAVAILABLE
56+
when(blockingStub.recordActivityTaskHeartbeat(any()))
57+
.thenThrow(new StatusRuntimeException(Status.UNAVAILABLE));
58+
59+
ActivityInfo info = activityInfoWithHeartbeatTimeout(heartbeatTimeout);
60+
HeartbeatContextImpl ctx = createHeartbeatContext(info);
61+
62+
long startNanos = System.nanoTime();
63+
ctx.heartbeat("details-1");
64+
65+
ActivityCompletionException caught =
66+
Eventually.assertEventually(
67+
Duration.ofSeconds(10),
68+
() -> {
69+
try {
70+
ctx.heartbeat("poll");
71+
fail("Expected ActivityCanceledException");
72+
return null;
73+
} catch (ActivityCompletionException e) {
74+
return e;
75+
}
76+
});
77+
78+
long elapsedMs = Duration.ofNanos(System.nanoTime() - startNanos).toMillis();
79+
80+
assertSame(ActivityCanceledException.class, caught.getClass());
81+
assertNotNull("Expected a TimeoutFailure cause", caught.getCause());
82+
assertSame(TimeoutFailure.class, caught.getCause().getClass());
83+
assertEquals(
84+
TimeoutType.TIMEOUT_TYPE_HEARTBEAT, ((TimeoutFailure) caught.getCause()).getTimeoutType());
85+
long expectedMinMs = heartbeatTimeout.toMillis() + TEST_BUFFER_MILLIS;
86+
assertTrue(
87+
"Timeout should not fire before heartbeat timeout + buffer ("
88+
+ elapsedMs
89+
+ "ms elapsed, expected >= "
90+
+ expectedMinMs
91+
+ "ms)",
92+
elapsedMs >= expectedMinMs);
93+
94+
ctx.cancelOutstandingHeartbeat();
95+
}
96+
97+
@Test
98+
public void heartbeatTimeoutResetsOnSuccessfulSend() {
99+
Duration heartbeatTimeout = Duration.ofMillis(500);
100+
AtomicInteger callCount = new AtomicInteger();
101+
102+
// First call succeeds, then all subsequent calls fail
103+
when(blockingStub.recordActivityTaskHeartbeat(any()))
104+
.thenAnswer(
105+
invocation -> {
106+
if (callCount.getAndIncrement() == 0) {
107+
return RecordActivityTaskHeartbeatResponse.getDefaultInstance();
108+
}
109+
throw new StatusRuntimeException(Status.UNAVAILABLE);
110+
});
111+
112+
ActivityInfo info = activityInfoWithHeartbeatTimeout(heartbeatTimeout);
113+
HeartbeatContextImpl ctx = createHeartbeatContext(info);
114+
115+
// The first heartbeat() call sends the RPC synchronously (no scheduled heartbeat yet).
116+
// Record the time before calling — the timer reset happens during this call.
117+
long resetNanos = System.nanoTime();
118+
ctx.heartbeat("details-1");
119+
assertEquals("First RPC should have been the successful one", 1, callCount.get());
120+
121+
// Poll until the timeout fires again (from the reset point)
122+
Eventually.assertEventually(
123+
Duration.ofSeconds(10),
124+
() -> {
125+
try {
126+
ctx.heartbeat("poll");
127+
fail("Expected ActivityCanceledException");
128+
} catch (ActivityCanceledException e) {
129+
// expected
130+
}
131+
});
132+
133+
long elapsedSinceResetMs = Duration.ofNanos(System.nanoTime() - resetNanos).toMillis();
134+
long expectedMinMs = heartbeatTimeout.toMillis() + TEST_BUFFER_MILLIS;
135+
assertTrue(
136+
"Timeout should not fire before heartbeat timeout + buffer from reset point ("
137+
+ elapsedSinceResetMs
138+
+ "ms elapsed since reset, expected >= "
139+
+ expectedMinMs
140+
+ "ms)",
141+
elapsedSinceResetMs >= expectedMinMs);
142+
143+
ctx.cancelOutstandingHeartbeat();
144+
}
145+
146+
@Test
147+
public void heartbeatTimeoutPersistsAcrossMultipleCalls() {
148+
Duration heartbeatTimeout = Duration.ofMillis(500);
149+
150+
// All heartbeat RPCs fail with UNAVAILABLE
151+
when(blockingStub.recordActivityTaskHeartbeat(any()))
152+
.thenThrow(new StatusRuntimeException(Status.UNAVAILABLE));
153+
154+
ActivityInfo info = activityInfoWithHeartbeatTimeout(heartbeatTimeout);
155+
HeartbeatContextImpl ctx = createHeartbeatContext(info);
156+
157+
ctx.heartbeat("details-1");
158+
159+
// Wait for timeout to fire
160+
Eventually.assertEventually(
161+
Duration.ofSeconds(10),
162+
() -> {
163+
try {
164+
ctx.heartbeat("poll");
165+
fail("Expected ActivityCanceledException");
166+
} catch (ActivityCanceledException e) {
167+
// expected
168+
}
169+
});
170+
171+
// Subsequent calls should continue to throw
172+
for (int i = 0; i < 5; i++) {
173+
try {
174+
ctx.heartbeat("details-" + i);
175+
fail("Expected ActivityCanceledException on call " + i);
176+
} catch (ActivityCompletionException e) {
177+
assertSame(ActivityCanceledException.class, e.getClass());
178+
assertNotNull(e.getCause());
179+
assertSame(TimeoutFailure.class, e.getCause().getClass());
180+
}
181+
}
182+
183+
ctx.cancelOutstandingHeartbeat();
184+
}
185+
186+
private HeartbeatContextImpl createHeartbeatContext(ActivityInfo info) {
187+
return new HeartbeatContextImpl(
188+
service,
189+
"test-namespace",
190+
info,
191+
GlobalDataConverter.get(),
192+
heartbeatExecutor,
193+
new NoopScope(),
194+
"test-identity",
195+
Duration.ofSeconds(60),
196+
Duration.ofSeconds(30),
197+
TEST_BUFFER_MILLIS);
198+
}
199+
200+
private static ActivityInfo activityInfoWithHeartbeatTimeout(Duration heartbeatTimeout) {
201+
ActivityInfo info = mock(ActivityInfo.class);
202+
when(info.getHeartbeatTimeout()).thenReturn(heartbeatTimeout);
203+
when(info.getTaskToken()).thenReturn(new byte[] {1, 2, 3});
204+
when(info.getWorkflowId()).thenReturn("test-workflow-id");
205+
when(info.getWorkflowType()).thenReturn("test-workflow-type");
206+
when(info.getActivityType()).thenReturn("test-activity-type");
207+
when(info.getActivityTaskQueue()).thenReturn("test-task-queue");
208+
when(info.getActivityId()).thenReturn("test-activity-id");
209+
when(info.isLocal()).thenReturn(false);
210+
when(info.getHeartbeatDetails()).thenReturn(Optional.empty());
211+
return info;
212+
}
213+
}

0 commit comments

Comments
 (0)