Skip to content

Commit 316a40b

Browse files
committed
[visibility] attempt to produce interleaved replay behavior with
workflow.Async
1 parent 515c90d commit 316a40b

File tree

2 files changed

+185
-1
lines changed

2 files changed

+185
-1
lines changed

temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1174,7 +1174,7 @@ public int getVersion(String changeId, int minSupported, int maxSupported) {
11741174
* Previously the SDK would yield on the getVersion call to the scheduler. This is not ideal because it can lead to non-deterministic
11751175
* scheduling if the getVersion call was removed.
11761176
* */
1177-
if (replayContext.tryUseSdkFlag(SdkFlag.SKIP_YIELD_ON_VERSION)) {
1177+
if (replayContext.checkSdkFlag(SdkFlag.SKIP_YIELD_ON_VERSION)) {
11781178
// This can happen if we are replaying a workflow and encounter a getVersion call that did not
11791179
// exist on the original execution and the range does not include the default version.
11801180
if (versionToUse == null) {
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
package io.temporal.workflow.versionTests;
2+
3+
import static org.junit.Assert.assertEquals;
4+
import static org.junit.Assert.assertFalse;
5+
import static org.junit.Assert.assertTrue;
6+
7+
import io.temporal.activity.ActivityInterface;
8+
import io.temporal.activity.ActivityMethod;
9+
import io.temporal.activity.LocalActivityOptions;
10+
import io.temporal.api.common.v1.WorkflowExecution;
11+
import io.temporal.api.enums.v1.EventType;
12+
import io.temporal.api.history.v1.HistoryEvent;
13+
import io.temporal.client.WorkflowClient;
14+
import io.temporal.client.WorkflowOptions;
15+
import io.temporal.client.WorkflowStub;
16+
import io.temporal.common.WorkflowExecutionHistory;
17+
import io.temporal.internal.common.SdkFlag;
18+
import io.temporal.internal.history.VersionMarkerUtils;
19+
import io.temporal.internal.statemachines.WorkflowStateMachines;
20+
import io.temporal.testing.TestWorkflowEnvironment;
21+
import io.temporal.testing.WorkflowReplayer;
22+
import io.temporal.worker.Worker;
23+
import io.temporal.worker.WorkerOptions;
24+
import io.temporal.workflow.Async;
25+
import io.temporal.workflow.CompletablePromise;
26+
import io.temporal.workflow.Promise;
27+
import io.temporal.workflow.Workflow;
28+
import io.temporal.workflow.WorkflowInterface;
29+
import io.temporal.workflow.WorkflowMethod;
30+
import io.temporal.workflow.unsafe.WorkflowUnsafe;
31+
import java.time.Duration;
32+
import java.util.Collections;
33+
import java.util.List;
34+
import java.util.Locale;
35+
import org.junit.After;
36+
import org.junit.Before;
37+
import org.junit.Test;
38+
39+
public class GetVersionAsyncLocalActivityReplayTest {
40+
private static final String TASK_QUEUE = "get-version-async-local-activity-replay";
41+
private static final String CHANGE_ID = "async-local-activity-change";
42+
43+
private static boolean hasReplayed;
44+
45+
private List<SdkFlag> savedInitialFlags;
46+
47+
@Before
48+
public void setUp() {
49+
hasReplayed = false;
50+
savedInitialFlags = WorkflowStateMachines.initialFlags;
51+
WorkflowStateMachines.initialFlags =
52+
Collections.singletonList(SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION);
53+
}
54+
55+
@After
56+
public void tearDown() {
57+
WorkflowStateMachines.initialFlags = savedInitialFlags;
58+
}
59+
60+
@Test
61+
public void testGetVersionReplayWithAsyncLocalActivitiesKeepsExpectCBoundToC() throws Exception {
62+
WorkflowExecutionHistory history = executeWorkflowAndCaptureHistory();
63+
64+
assertTrue(hasReplayed);
65+
assertTrue(hasVersionMarker(history, CHANGE_ID));
66+
assertFalse(hasSdkFlag(history, SdkFlag.SKIP_YIELD_ON_VERSION));
67+
68+
WorkflowReplayer.replayWorkflowExecution(history, AsyncLocalActivityWorkflowImpl.class);
69+
}
70+
71+
private WorkflowExecutionHistory executeWorkflowAndCaptureHistory() {
72+
try (TestWorkflowEnvironment testEnvironment = TestWorkflowEnvironment.newInstance()) {
73+
Worker worker =
74+
testEnvironment.newWorker(
75+
TASK_QUEUE,
76+
WorkerOptions.newBuilder()
77+
.setStickyQueueScheduleToStartTimeout(Duration.ZERO)
78+
.build());
79+
worker.registerWorkflowImplementationTypes(AsyncLocalActivityWorkflowImpl.class);
80+
worker.registerActivitiesImplementations(new EchoActivitiesImpl());
81+
testEnvironment.start();
82+
83+
WorkflowClient client = testEnvironment.getWorkflowClient();
84+
ReplayTestWorkflow workflow =
85+
client.newWorkflowStub(
86+
ReplayTestWorkflow.class,
87+
WorkflowOptions.newBuilder()
88+
.setTaskQueue(TASK_QUEUE)
89+
.setWorkflowRunTimeout(Duration.ofMinutes(1))
90+
.setWorkflowTaskTimeout(Duration.ofSeconds(5))
91+
.build());
92+
93+
WorkflowExecution execution = WorkflowClient.start(workflow::execute);
94+
assertEquals("ABC", WorkflowStub.fromTyped(workflow).getResult(String.class));
95+
96+
return client.fetchHistory(execution.getWorkflowId(), execution.getRunId());
97+
}
98+
}
99+
100+
private static boolean hasSdkFlag(WorkflowExecutionHistory history, SdkFlag flag) {
101+
for (HistoryEvent event : history.getEvents()) {
102+
if (event.getEventType() != EventType.EVENT_TYPE_WORKFLOW_TASK_COMPLETED) {
103+
continue;
104+
}
105+
if (!event.getWorkflowTaskCompletedEventAttributes().hasSdkMetadata()) {
106+
continue;
107+
}
108+
if (event
109+
.getWorkflowTaskCompletedEventAttributes()
110+
.getSdkMetadata()
111+
.getLangUsedFlagsList()
112+
.contains(flag.getValue())) {
113+
return true;
114+
}
115+
}
116+
return false;
117+
}
118+
119+
private static boolean hasVersionMarker(WorkflowExecutionHistory history, String changeId) {
120+
for (HistoryEvent event : history.getEvents()) {
121+
if (changeId.equals(VersionMarkerUtils.tryGetChangeIdFromVersionMarkerEvent(event))) {
122+
return true;
123+
}
124+
}
125+
return false;
126+
}
127+
128+
@WorkflowInterface
129+
public interface ReplayTestWorkflow {
130+
@WorkflowMethod
131+
String execute();
132+
}
133+
134+
@ActivityInterface
135+
public interface EchoActivities {
136+
@ActivityMethod
137+
String echo(String value);
138+
}
139+
140+
public static class EchoActivitiesImpl implements EchoActivities {
141+
@Override
142+
public String echo(String value) {
143+
return value.toUpperCase(Locale.ROOT);
144+
}
145+
}
146+
147+
public static class AsyncLocalActivityWorkflowImpl implements ReplayTestWorkflow {
148+
private final EchoActivities echoActivities =
149+
Workflow.newLocalActivityStub(
150+
EchoActivities.class,
151+
LocalActivityOptions.newBuilder()
152+
.setStartToCloseTimeout(Duration.ofSeconds(5))
153+
.build());
154+
155+
@Override
156+
public String execute() {
157+
CompletablePromise<String> expectA = Workflow.newPromise();
158+
CompletablePromise<String> expectB = Workflow.newPromise();
159+
Promise<Void> asyncBranch =
160+
Async.procedure(
161+
() -> {
162+
expectA.complete(echoActivities.echo("a"));
163+
expectB.complete(echoActivities.echo("b"));
164+
});
165+
166+
int version = Workflow.getVersion(CHANGE_ID, Workflow.DEFAULT_VERSION, 1);
167+
assertEquals(1, version);
168+
169+
String expectC = echoActivities.echo("c");
170+
asyncBranch.get();
171+
172+
assertEquals("A", expectA.get());
173+
assertEquals("B", expectB.get());
174+
assertEquals("C", expectC);
175+
176+
if (WorkflowUnsafe.isReplaying()) {
177+
hasReplayed = true;
178+
}
179+
180+
Workflow.sleep(Duration.ofSeconds(1));
181+
return expectA.get() + expectB.get() + expectC;
182+
}
183+
}
184+
}

0 commit comments

Comments
 (0)