Skip to content

Commit 9516965

Browse files
authored
Cancel timer when Workflow.await condition is satisfied (#2799)
1 parent 9356745 commit 9516965

File tree

4 files changed

+400
-3
lines changed

4 files changed

+400
-3
lines changed

temporal-sdk/src/main/java/io/temporal/internal/common/SdkFlag.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ public enum SdkFlag {
1818
* Changes behavior of CancellationScope to cancel children in a deterministic order.
1919
*/
2020
DETERMINISTIC_CANCELLATION_SCOPE_ORDER(3),
21+
/*
22+
* Changes behavior of Workflow.await(duration, condition) to cancel the timer if the
23+
* condition is resolved before the timeout.
24+
*/
25+
CANCEL_AWAIT_TIMER_ON_CONDITION(4),
2126
UNKNOWN(Integer.MAX_VALUE);
2227

2328
private final int value;

temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1321,9 +1321,35 @@ public void sleep(Duration duration) {
13211321

13221322
@Override
13231323
public boolean await(Duration timeout, String reason, Supplier<Boolean> unblockCondition) {
1324-
Promise<Void> timer = newTimer(timeout);
1325-
WorkflowThread.await(reason, () -> (timer.isCompleted() || unblockCondition.get()));
1326-
return !timer.isCompleted();
1324+
// TODO: Change checkSdkFlag to tryUseSdkFlag in the next release to enable this flag by
1325+
// default.
1326+
boolean cancelTimerOnCondition =
1327+
replayContext.checkSdkFlag(SdkFlag.CANCEL_AWAIT_TIMER_ON_CONDITION);
1328+
1329+
if (cancelTimerOnCondition) {
1330+
// If condition is already satisfied, skip creating timer
1331+
if (unblockCondition.get()) {
1332+
return true;
1333+
}
1334+
// Create timer in a cancellation scope so we can cancel it when condition is satisfied
1335+
CompletablePromise<Void> timer = Workflow.newPromise();
1336+
CancellationScope timerScope =
1337+
Workflow.newCancellationScope(() -> timer.completeFrom(newTimer(timeout)));
1338+
timerScope.run();
1339+
1340+
WorkflowThread.await(reason, () -> (timer.isCompleted() || unblockCondition.get()));
1341+
1342+
boolean conditionSatisfied = !timer.isCompleted();
1343+
if (conditionSatisfied) {
1344+
timerScope.cancel("await condition resolved");
1345+
}
1346+
return conditionSatisfied;
1347+
} else {
1348+
// Old behavior: timer is not cancelled when condition is satisfied
1349+
Promise<Void> timer = newTimer(timeout);
1350+
WorkflowThread.await(reason, () -> (timer.isCompleted() || unblockCondition.get()));
1351+
return !timer.isCompleted();
1352+
}
13271353
}
13281354

13291355
@Override
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
package io.temporal.workflow.cancellationTests;
2+
3+
import static org.junit.Assert.*;
4+
5+
import io.temporal.api.common.v1.WorkflowExecution;
6+
import io.temporal.api.enums.v1.EventType;
7+
import io.temporal.client.WorkflowClient;
8+
import io.temporal.client.WorkflowStub;
9+
import io.temporal.internal.common.SdkFlag;
10+
import io.temporal.internal.statemachines.WorkflowStateMachines;
11+
import io.temporal.testing.WorkflowReplayer;
12+
import io.temporal.testing.internal.SDKTestWorkflowRule;
13+
import io.temporal.workflow.SignalMethod;
14+
import io.temporal.workflow.Workflow;
15+
import io.temporal.workflow.WorkflowInterface;
16+
import io.temporal.workflow.WorkflowMethod;
17+
import java.time.Duration;
18+
import java.util.Arrays;
19+
import java.util.Collections;
20+
import java.util.List;
21+
import org.junit.After;
22+
import org.junit.Before;
23+
import org.junit.Rule;
24+
import org.junit.Test;
25+
26+
/**
27+
* Tests for the CANCEL_AWAIT_TIMER_ON_CONDITION SDK flag behavior. Tests verify both old and new
28+
* behavior by explicitly switching the SDK flag, following the Go SDK pattern.
29+
*
30+
* <p>Since the flag is NOT auto-enabled (uses checkSdkFlag, not tryUseSdkFlag), tests must
31+
* explicitly add it to initialFlags to enable the new behavior.
32+
*/
33+
public class WorkflowAwaitCancelTimerOnConditionTest {
34+
35+
private List<SdkFlag> savedInitialFlags;
36+
37+
@Rule
38+
public SDKTestWorkflowRule testWorkflowRule =
39+
SDKTestWorkflowRule.newBuilder()
40+
.setWorkflowTypes(
41+
TestAwaitCancelTimerWorkflowImpl.class,
42+
TestImmediateConditionWorkflowImpl.class,
43+
TestReturnValueWorkflowImpl.class)
44+
.build();
45+
46+
@Before
47+
public void setUp() {
48+
savedInitialFlags = WorkflowStateMachines.initialFlags;
49+
}
50+
51+
@After
52+
public void tearDown() {
53+
WorkflowStateMachines.initialFlags = savedInitialFlags;
54+
}
55+
56+
/**
57+
* Tests that the timer IS cancelled when the flag is explicitly enabled. With
58+
* CANCEL_AWAIT_TIMER_ON_CONDITION in initialFlags, we expect TIMER_CANCELED in history.
59+
*/
60+
@Test
61+
public void testTimerCancelledWhenFlagEnabled() {
62+
WorkflowStateMachines.initialFlags =
63+
Collections.unmodifiableList(
64+
Arrays.asList(
65+
SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION, SdkFlag.CANCEL_AWAIT_TIMER_ON_CONDITION));
66+
67+
TestAwaitWorkflow workflow = testWorkflowRule.newWorkflowStub(TestAwaitWorkflow.class);
68+
WorkflowExecution execution = WorkflowClient.start(workflow::execute);
69+
70+
testWorkflowRule.sleep(Duration.ofMillis(500));
71+
workflow.unblock();
72+
73+
WorkflowStub untyped = WorkflowStub.fromTyped(workflow);
74+
String result = untyped.getResult(String.class);
75+
assertEquals("condition satisfied", result);
76+
77+
testWorkflowRule.assertHistoryEvent(
78+
execution.getWorkflowId(), EventType.EVENT_TYPE_TIMER_STARTED);
79+
testWorkflowRule.assertHistoryEvent(
80+
execution.getWorkflowId(), EventType.EVENT_TYPE_TIMER_CANCELED);
81+
}
82+
83+
/**
84+
* Tests that the timer is NOT cancelled when the flag is disabled (default). Without the flag in
85+
* initialFlags, the old behavior is used: timer runs even after condition is satisfied.
86+
*/
87+
@Test
88+
public void testTimerNotCancelledWhenFlagDisabled() {
89+
// Default initialFlags do NOT include CANCEL_AWAIT_TIMER_ON_CONDITION
90+
TestAwaitWorkflow workflow = testWorkflowRule.newWorkflowStub(TestAwaitWorkflow.class);
91+
WorkflowExecution execution = WorkflowClient.start(workflow::execute);
92+
93+
testWorkflowRule.sleep(Duration.ofMillis(500));
94+
workflow.unblock();
95+
96+
WorkflowStub untyped = WorkflowStub.fromTyped(workflow);
97+
String result = untyped.getResult(String.class);
98+
assertEquals("condition satisfied", result);
99+
100+
testWorkflowRule.assertHistoryEvent(
101+
execution.getWorkflowId(), EventType.EVENT_TYPE_TIMER_STARTED);
102+
testWorkflowRule.assertNoHistoryEvent(
103+
execution.getWorkflowId(), EventType.EVENT_TYPE_TIMER_CANCELED);
104+
}
105+
106+
/**
107+
* Tests that no timer is created when the condition is immediately true and the flag is enabled.
108+
*/
109+
@Test
110+
public void testNoTimerWhenConditionImmediatelySatisfiedWithFlag() {
111+
WorkflowStateMachines.initialFlags =
112+
Collections.unmodifiableList(
113+
Arrays.asList(
114+
SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION, SdkFlag.CANCEL_AWAIT_TIMER_ON_CONDITION));
115+
116+
TestImmediateConditionWorkflow workflow =
117+
testWorkflowRule.newWorkflowStub(TestImmediateConditionWorkflow.class);
118+
WorkflowExecution execution = WorkflowClient.start(workflow::execute);
119+
120+
WorkflowStub untyped = WorkflowStub.fromTyped(workflow);
121+
String result = untyped.getResult(String.class);
122+
assertEquals("immediate condition", result);
123+
124+
testWorkflowRule.assertNoHistoryEvent(
125+
execution.getWorkflowId(), EventType.EVENT_TYPE_TIMER_STARTED);
126+
}
127+
128+
/**
129+
* Tests that the await returns true when condition is satisfied and false when it times out. This
130+
* verifies the return value semantics are preserved with the new flag.
131+
*/
132+
@Test
133+
public void testAwaitReturnValue() {
134+
WorkflowStateMachines.initialFlags =
135+
Collections.unmodifiableList(
136+
Arrays.asList(
137+
SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION, SdkFlag.CANCEL_AWAIT_TIMER_ON_CONDITION));
138+
139+
TestReturnValueWorkflow workflow =
140+
testWorkflowRule.newWorkflowStub(TestReturnValueWorkflow.class);
141+
WorkflowExecution execution = WorkflowClient.start(workflow::execute);
142+
143+
testWorkflowRule.sleep(Duration.ofMillis(500));
144+
workflow.unblock();
145+
146+
WorkflowStub untyped = WorkflowStub.fromTyped(workflow);
147+
String result = untyped.getResult(String.class);
148+
assertEquals("conditionSatisfied=true,timedOut=true", result);
149+
150+
testWorkflowRule.assertHistoryEvent(
151+
execution.getWorkflowId(), EventType.EVENT_TYPE_TIMER_CANCELED);
152+
testWorkflowRule.assertHistoryEvent(
153+
execution.getWorkflowId(), EventType.EVENT_TYPE_TIMER_FIRED);
154+
}
155+
156+
/**
157+
* Tests replay compatibility with old workflow histories that were recorded WITHOUT the
158+
* CANCEL_AWAIT_TIMER_ON_CONDITION flag.
159+
*/
160+
@Test
161+
public void testReplayOldHistoryWithoutFlag() throws Exception {
162+
WorkflowReplayer.replayWorkflowExecutionFromResource(
163+
"awaitTimerConditionOldBehavior.json", TestAwaitCancelTimerWorkflowImpl.class);
164+
}
165+
166+
@WorkflowInterface
167+
public interface TestAwaitWorkflow {
168+
@WorkflowMethod
169+
String execute();
170+
171+
@SignalMethod
172+
void unblock();
173+
}
174+
175+
@WorkflowInterface
176+
public interface TestImmediateConditionWorkflow {
177+
@WorkflowMethod
178+
String execute();
179+
}
180+
181+
@WorkflowInterface
182+
public interface TestReturnValueWorkflow {
183+
@WorkflowMethod
184+
String execute();
185+
186+
@SignalMethod
187+
void unblock();
188+
}
189+
190+
public static class TestAwaitCancelTimerWorkflowImpl implements TestAwaitWorkflow {
191+
private boolean unblocked = false;
192+
193+
@Override
194+
public String execute() {
195+
boolean result = Workflow.await(Duration.ofHours(1), () -> unblocked);
196+
return result ? "condition satisfied" : "timed out";
197+
}
198+
199+
@Override
200+
public void unblock() {
201+
unblocked = true;
202+
}
203+
}
204+
205+
public static class TestImmediateConditionWorkflowImpl implements TestImmediateConditionWorkflow {
206+
@Override
207+
public String execute() {
208+
boolean result = Workflow.await(Duration.ofHours(1), () -> true);
209+
return result ? "immediate condition" : "unexpected timeout";
210+
}
211+
}
212+
213+
public static class TestReturnValueWorkflowImpl implements TestReturnValueWorkflow {
214+
private boolean unblocked = false;
215+
216+
@Override
217+
public String execute() {
218+
boolean conditionSatisfied = Workflow.await(Duration.ofHours(1), () -> unblocked);
219+
boolean timedOut = !Workflow.await(Duration.ofMillis(100), () -> false);
220+
return "conditionSatisfied=" + conditionSatisfied + ",timedOut=" + timedOut;
221+
}
222+
223+
@Override
224+
public void unblock() {
225+
unblocked = true;
226+
}
227+
}
228+
}

0 commit comments

Comments
 (0)