Skip to content

Commit 5d969b8

Browse files
authored
tryUseSdkFlag(SdkFlag.SKIP_YIELD_ON_VERSION) so that we default to enabling SKIP_YIELD_ON_VERSION when calling getVersion() (#2819)
* Add getVersion failure regression test * Add SKIP_YIELD_ON_VERSION to WorkflowStateMachines.initialFlags, fixing exception masking after multiple `getVersion()` calls * default to `SKIP_YIELD_ON_VERSION` using `tryUseSdkFlag()` instead of adding it to `initialFlags` * Strengthen GetVersionMultithreadingRemoveTest to verify that setting `SKIP_YIELD_ON_VERSION` by default corrects for the issue.
1 parent b9b0f6b commit 5d969b8

File tree

4 files changed

+106
-13
lines changed

4 files changed

+106
-13
lines changed

temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ enum HandleEventStatus {
5353
/** Initial set of SDK flags that will be set on all new workflow executions. */
5454
@VisibleForTesting
5555
public static List<SdkFlag> initialFlags =
56-
Collections.unmodifiableList(Arrays.asList(SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION));
56+
Collections.singletonList(SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION);
5757

5858
/**
5959
* Keep track of the change versions that have been seen by the SDK. This is used to generate the

temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1174,7 +1174,7 @@ public int getVersion(String changeId, int minSupported, int maxSupported) {
11741174
* Previously the SDK would yield on the getVersion call to the scheduler. This is not ideal because it can lead to non-deterministic
11751175
* scheduling if the getVersion call was removed.
11761176
* */
1177-
if (replayContext.checkSdkFlag(SdkFlag.SKIP_YIELD_ON_VERSION)) {
1177+
if (replayContext.tryUseSdkFlag(SdkFlag.SKIP_YIELD_ON_VERSION)) {
11781178
// This can happen if we are replaying a workflow and encounter a getVersion call that did not
11791179
// exist on the original execution and the range does not include the default version.
11801180
if (versionToUse == null) {
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package io.temporal.workflow.failure;
2+
3+
import static io.temporal.testUtils.Eventually.assertEventually;
4+
5+
import io.temporal.api.common.v1.WorkflowExecution;
6+
import io.temporal.api.enums.v1.EventType;
7+
import io.temporal.api.failure.v1.Failure;
8+
import io.temporal.api.history.v1.HistoryEvent;
9+
import io.temporal.client.WorkflowClient;
10+
import io.temporal.client.WorkflowException;
11+
import io.temporal.client.WorkflowStub;
12+
import io.temporal.testing.internal.SDKTestWorkflowRule;
13+
import io.temporal.workflow.Workflow;
14+
import io.temporal.workflow.shared.TestWorkflows.TestWorkflow1;
15+
import java.time.Duration;
16+
import java.util.List;
17+
import org.junit.Assert;
18+
import org.junit.Rule;
19+
import org.junit.Test;
20+
import org.junit.rules.TestName;
21+
22+
public class WorkflowFailureGetVersionTest {
23+
24+
@Rule public TestName testName = new TestName();
25+
26+
@Rule
27+
public SDKTestWorkflowRule testWorkflowRule =
28+
SDKTestWorkflowRule.newBuilder()
29+
.setWorkflowTypes(TestWorkflowGetVersionAndException.class)
30+
.build();
31+
32+
@Test
33+
public void getVersionAndException() {
34+
TestWorkflow1 workflow = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class);
35+
WorkflowExecution execution = WorkflowClient.start(workflow::execute, testName.getMethodName());
36+
WorkflowStub workflowStub = WorkflowStub.fromTyped(workflow);
37+
38+
try {
39+
HistoryEvent workflowTaskFailed =
40+
assertEventually(
41+
Duration.ofSeconds(5),
42+
() -> {
43+
List<HistoryEvent> failedEvents =
44+
testWorkflowRule.getHistoryEvents(
45+
execution.getWorkflowId(), EventType.EVENT_TYPE_WORKFLOW_TASK_FAILED);
46+
Assert.assertFalse("No workflow task failure recorded", failedEvents.isEmpty());
47+
return failedEvents.get(0);
48+
});
49+
50+
Failure failure =
51+
getDeepestFailure(workflowTaskFailed.getWorkflowTaskFailedEventAttributes().getFailure());
52+
Assert.assertEquals("Any error", failure.getMessage());
53+
Assert.assertTrue(failure.hasApplicationFailureInfo());
54+
Assert.assertEquals(
55+
RuntimeException.class.getName(), failure.getApplicationFailureInfo().getType());
56+
} finally {
57+
try {
58+
workflowStub.terminate("terminate test workflow");
59+
} catch (WorkflowException ignored) {
60+
}
61+
}
62+
}
63+
64+
private static Failure getDeepestFailure(Failure failure) {
65+
while (failure.hasCause()) {
66+
failure = failure.getCause();
67+
}
68+
return failure;
69+
}
70+
71+
public static class TestWorkflowGetVersionAndException implements TestWorkflow1 {
72+
73+
@Override
74+
public String execute(String unused) {
75+
String changeId = "change-id";
76+
Workflow.getVersion(changeId, Workflow.DEFAULT_VERSION, 1);
77+
Workflow.getVersion(changeId, Workflow.DEFAULT_VERSION, 1);
78+
throw new RuntimeException("Any error");
79+
}
80+
}
81+
}

temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionMultithreadingRemoveTest.java

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,33 @@
11
package io.temporal.workflow.versionTests;
22

3-
import static org.junit.Assert.*;
4-
import static org.junit.Assume.assumeTrue;
3+
import static org.junit.Assert.assertEquals;
4+
import static org.junit.Assert.assertTrue;
55

66
import io.temporal.internal.Issue;
77
import io.temporal.testing.WorkflowReplayer;
88
import io.temporal.testing.internal.SDKTestOptions;
99
import io.temporal.testing.internal.SDKTestWorkflowRule;
10+
import io.temporal.testing.internal.TracingWorkerInterceptor;
1011
import io.temporal.worker.WorkerOptions;
1112
import io.temporal.workflow.Async;
1213
import io.temporal.workflow.Workflow;
1314
import io.temporal.workflow.shared.TestActivities;
1415
import io.temporal.workflow.shared.TestWorkflows;
1516
import io.temporal.workflow.unsafe.WorkflowUnsafe;
1617
import java.time.Duration;
18+
import org.junit.Before;
1719
import org.junit.Rule;
1820
import org.junit.Test;
1921

2022
@Issue("https://github.com/temporalio/sdk-java/issues/2307")
21-
public class GetVersionMultithreadingRemoveTest extends BaseVersionTest {
23+
public class GetVersionMultithreadingRemoveTest {
2224

2325
private static boolean hasReplayed;
2426

2527
@Rule
2628
public SDKTestWorkflowRule testWorkflowRule =
2729
SDKTestWorkflowRule.newBuilder()
28-
.setWorkflowTypes(
29-
getDefaultWorkflowImplementationOptions(), TestGetVersionWorkflowImpl.class)
30+
.setWorkflowTypes(TestGetVersionWorkflowImpl.class)
3031
.setActivityImplementations(new TestActivities.TestActivitiesImpl())
3132
// Forcing a replay. Full history arrived from a normal queue causing a replay.
3233
.setWorkerOptions(
@@ -35,18 +36,30 @@ public class GetVersionMultithreadingRemoveTest extends BaseVersionTest {
3536
.build())
3637
.build();
3738

38-
public GetVersionMultithreadingRemoveTest(boolean setVersioningFlag, boolean upsertVersioningSA) {
39-
super(setVersioningFlag, upsertVersioningSA);
39+
@Before
40+
public void setUp() {
41+
hasReplayed = false;
4042
}
4143

4244
@Test
4345
public void testGetVersionMultithreadingRemoval() {
44-
assumeTrue("This test only passes if SKIP_YIELD_ON_VERSION is enabled", setVersioningFlag);
4546
TestWorkflows.TestWorkflow1 workflowStub =
4647
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class);
48+
4749
String result = workflowStub.execute(testWorkflowRule.getTaskQueue());
50+
4851
assertTrue(hasReplayed);
4952
assertEquals("activity1", result);
53+
testWorkflowRule
54+
.getInterceptor(TracingWorkerInterceptor.class)
55+
.setExpected(
56+
"interceptExecuteWorkflow " + SDKTestWorkflowRule.UUID_REGEXP,
57+
"newThread workflow-method",
58+
"newThread null",
59+
"getVersion",
60+
"executeActivity customActivity1",
61+
"sleep PT1S",
62+
"activity customActivity1");
5063
}
5164

5265
@Test
@@ -76,9 +89,8 @@ public String execute(String taskQueue) {
7689
} else {
7790
hasReplayed = true;
7891
}
79-
String result =
80-
"activity" + testActivities.activity1(1); // This is executed in non-replay mode.
81-
return result;
92+
93+
return "activity" + testActivities.activity1(1);
8294
}
8395
}
8496
}

0 commit comments

Comments
 (0)