Skip to content

Commit c436443

Browse files
committed
Add replay ordering reproducer for interleaved updates
1 parent a5d3f33 commit c436443

File tree

1 file changed

+177
-0
lines changed

1 file changed

+177
-0
lines changed
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
package io.temporal.internal.replay;
2+
3+
import static org.junit.Assert.assertEquals;
4+
import static org.junit.Assert.assertNotEquals;
5+
import static org.junit.Assert.assertThrows;
6+
import static org.junit.Assert.assertTrue;
7+
8+
import com.uber.m3.tally.NoopScope;
9+
import io.temporal.api.command.v1.Command;
10+
import io.temporal.api.enums.v1.CommandType;
11+
import io.temporal.api.history.v1.HistoryEvent;
12+
import io.temporal.api.query.v1.WorkflowQuery;
13+
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse;
14+
import io.temporal.client.WorkflowClient;
15+
import io.temporal.common.WorkflowExecutionHistory;
16+
import io.temporal.internal.history.VersionMarkerUtils;
17+
import io.temporal.internal.worker.QueryReplayHelper;
18+
import io.temporal.serviceclient.WorkflowServiceStubs;
19+
import io.temporal.testing.TestWorkflowEnvironment;
20+
import io.temporal.testing.WorkflowHistoryLoader;
21+
import io.temporal.worker.Worker;
22+
import io.temporal.workflow.versionTests.GetVersionInterleavedUpdateReplayTest.GreetingWorkflowImpl;
23+
import java.lang.reflect.Field;
24+
import java.lang.reflect.Method;
25+
import java.util.ArrayList;
26+
import java.util.Arrays;
27+
import java.util.List;
28+
import org.junit.Test;
29+
30+
public class GetVersionInterleavedUpdateReplayTaskHandlerTest {
31+
private static final String HISTORY_RESOURCE =
32+
"testGetVersionInterleavedUpdateReplayHistory.json";
33+
private static final String EXPECTED_NON_DETERMINISTIC_MESSAGE =
34+
"[TMPRL1100] getVersion call before the existing version marker event. The most probable cause is retroactive addition of a getVersion call with an existing 'changeId'";
35+
private static final String EXPECTED_NON_DETERMINISTIC_FRAGMENT =
36+
"io.temporal.worker.NonDeterministicException: " + EXPECTED_NON_DETERMINISTIC_MESSAGE;
37+
private static final String EXPECTED_FIRST_CHANGE_ID = "ChangeId1";
38+
private static final String EXPECTED_SECOND_CHANGE_ID = "ChangeId2";
39+
private static final String TEST_TASK_QUEUE = "get-version-interleaved-update-replay";
40+
41+
@Test
42+
public void testReplayQueuesSecondVersionMarkerBeforeUpdateCompletionCommands() throws Exception {
43+
WorkflowExecutionHistory history =
44+
WorkflowHistoryLoader.readHistoryFromResource(HISTORY_RESOURCE);
45+
assertEquals(
46+
Arrays.asList(EXPECTED_FIRST_CHANGE_ID, EXPECTED_SECOND_CHANGE_ID),
47+
extractVersionChangeIds(history.getEvents()));
48+
49+
TestWorkflowEnvironment testEnvironment = TestWorkflowEnvironment.newInstance();
50+
ReplayWorkflowRunTaskHandler runTaskHandler = null;
51+
try {
52+
Worker worker = testEnvironment.newWorker(TEST_TASK_QUEUE);
53+
worker.registerWorkflowImplementationTypes(GreetingWorkflowImpl.class);
54+
55+
ReplayWorkflowTaskHandler replayTaskHandler = getNonStickyReplayTaskHandler(worker);
56+
PollWorkflowTaskQueueResponse.Builder replayTask = newReplayTask(history);
57+
runTaskHandler = createStatefulHandler(replayTaskHandler, replayTask);
58+
59+
WorkflowServiceStubs service =
60+
getField(replayTaskHandler, "service", WorkflowServiceStubs.class);
61+
String namespace = getField(replayTaskHandler, "namespace", String.class);
62+
ServiceWorkflowHistoryIterator historyIterator =
63+
new ServiceWorkflowHistoryIterator(service, namespace, replayTask, new NoopScope());
64+
65+
ReplayWorkflowRunTaskHandler replayHandler = runTaskHandler;
66+
RuntimeException thrown =
67+
assertThrows(
68+
RuntimeException.class,
69+
() -> replayHandler.handleDirectQueryWorkflowTask(replayTask, historyIterator));
70+
assertTrue(
71+
"Expected replay failure to contain the nondeterminism marker, but got: " + thrown,
72+
throwableChainContains(thrown, EXPECTED_NON_DETERMINISTIC_FRAGMENT)
73+
|| throwableChainContains(thrown, EXPECTED_NON_DETERMINISTIC_MESSAGE));
74+
75+
List<Command> pendingCommands = runTaskHandler.getWorkflowStateMachines().takeCommands();
76+
int versionMarkerIndex = indexOfVersionMarker(pendingCommands);
77+
int protocolMessageIndex =
78+
indexOfCommandType(pendingCommands, CommandType.COMMAND_TYPE_PROTOCOL_MESSAGE);
79+
80+
assertNotEquals(
81+
"Expected a pending Version marker command after replay failure", -1, versionMarkerIndex);
82+
assertTrue(
83+
"Expected the pending Version marker to be queued before any update completion protocol command: "
84+
+ pendingCommands,
85+
protocolMessageIndex == -1 || versionMarkerIndex < protocolMessageIndex);
86+
} finally {
87+
if (runTaskHandler != null) {
88+
runTaskHandler.close();
89+
}
90+
testEnvironment.close();
91+
}
92+
}
93+
94+
private static PollWorkflowTaskQueueResponse.Builder newReplayTask(
95+
WorkflowExecutionHistory history) {
96+
return PollWorkflowTaskQueueResponse.newBuilder()
97+
.setWorkflowExecution(history.getWorkflowExecution())
98+
.setWorkflowType(
99+
history
100+
.getHistory()
101+
.getEvents(0)
102+
.getWorkflowExecutionStartedEventAttributes()
103+
.getWorkflowType())
104+
.setStartedEventId(Long.MAX_VALUE)
105+
.setPreviousStartedEventId(Long.MAX_VALUE)
106+
.setHistory(history.getHistory())
107+
.setQuery(WorkflowQuery.newBuilder().setQueryType(WorkflowClient.QUERY_TYPE_REPLAY_ONLY));
108+
}
109+
110+
private static ReplayWorkflowTaskHandler getNonStickyReplayTaskHandler(Worker worker)
111+
throws Exception {
112+
Object workflowWorker = getField(worker, "workflowWorker", Object.class);
113+
QueryReplayHelper queryReplayHelper =
114+
getField(workflowWorker, "queryReplayHelper", QueryReplayHelper.class);
115+
return getField(queryReplayHelper, "handler", ReplayWorkflowTaskHandler.class);
116+
}
117+
118+
private static ReplayWorkflowRunTaskHandler createStatefulHandler(
119+
ReplayWorkflowTaskHandler replayTaskHandler, PollWorkflowTaskQueueResponse.Builder replayTask)
120+
throws Exception {
121+
Method method =
122+
ReplayWorkflowTaskHandler.class.getDeclaredMethod(
123+
"createStatefulHandler",
124+
PollWorkflowTaskQueueResponse.Builder.class,
125+
com.uber.m3.tally.Scope.class);
126+
method.setAccessible(true);
127+
return (ReplayWorkflowRunTaskHandler)
128+
method.invoke(replayTaskHandler, replayTask, new NoopScope());
129+
}
130+
131+
private static List<String> extractVersionChangeIds(List<HistoryEvent> events) {
132+
List<String> changeIds = new ArrayList<>();
133+
for (HistoryEvent event : events) {
134+
String changeId = VersionMarkerUtils.tryGetChangeIdFromVersionMarkerEvent(event);
135+
if (changeId != null) {
136+
changeIds.add(changeId);
137+
}
138+
}
139+
return changeIds;
140+
}
141+
142+
private static int indexOfVersionMarker(List<Command> commands) {
143+
for (int i = 0; i < commands.size(); i++) {
144+
if (VersionMarkerUtils.hasVersionMarkerStructure(commands.get(i))) {
145+
return i;
146+
}
147+
}
148+
return -1;
149+
}
150+
151+
private static int indexOfCommandType(List<Command> commands, CommandType commandType) {
152+
for (int i = 0; i < commands.size(); i++) {
153+
if (commands.get(i).getCommandType() == commandType) {
154+
return i;
155+
}
156+
}
157+
return -1;
158+
}
159+
160+
private static boolean throwableChainContains(Throwable throwable, String expected) {
161+
Throwable current = throwable;
162+
while (current != null) {
163+
if (String.valueOf(current).contains(expected)) {
164+
return true;
165+
}
166+
current = current.getCause();
167+
}
168+
return false;
169+
}
170+
171+
private static <T> T getField(Object target, String fieldName, Class<T> expectedType)
172+
throws Exception {
173+
Field field = target.getClass().getDeclaredField(fieldName);
174+
field.setAccessible(true);
175+
return expectedType.cast(field.get(target));
176+
}
177+
}

0 commit comments

Comments
 (0)