6969public final class TestWorkflowService extends WorkflowServiceGrpc .WorkflowServiceImplBase
7070 implements Closeable {
7171 private static final Logger log = LoggerFactory .getLogger (TestWorkflowService .class );
72- private static final JsonFormat .Printer JSON_PRINTER = JsonFormat .printer ();
7372 private static final JsonFormat .Parser JSON_PARSER = JsonFormat .parser ();
7473
7574 private static final String FAILURE_TYPE_STRING = Failure .getDescriptor ().getFullName ();
7675
7776 private final Map <ExecutionId , TestWorkflowMutableState > executions = new HashMap <>();
7877 // key->WorkflowId
7978 private final Map <WorkflowId , TestWorkflowMutableState > executionsByWorkflowId = new HashMap <>();
79+ private final Map <WorkflowChainId , TestWorkflowMutableState > executionsByFirstExecutionRunId =
80+ new HashMap <>();
8081 private final ExecutorService executor = Executors .newCachedThreadPool ();
8182 private final Lock lock = new ReentrantLock ();
8283
@@ -166,6 +167,52 @@ private TestWorkflowMutableState getMutableState(ExecutionId executionId) {
166167 return getMutableState (executionId , true );
167168 }
168169
170+ private static final class WorkflowChainId {
171+ private final String namespace ;
172+ private final String workflowId ;
173+ private final String firstExecutionRunId ;
174+
175+ private WorkflowChainId (String namespace , String workflowId , String firstExecutionRunId ) {
176+ this .namespace = Objects .requireNonNull (namespace );
177+ this .workflowId = Objects .requireNonNull (workflowId );
178+ this .firstExecutionRunId = Objects .requireNonNull (firstExecutionRunId );
179+ }
180+
181+ @ Override
182+ public boolean equals (Object o ) {
183+ if (this == o ) {
184+ return true ;
185+ }
186+ if (o == null || getClass () != o .getClass ()) {
187+ return false ;
188+ }
189+ WorkflowChainId that = (WorkflowChainId ) o ;
190+ return namespace .equals (that .namespace )
191+ && workflowId .equals (that .workflowId )
192+ && firstExecutionRunId .equals (that .firstExecutionRunId );
193+ }
194+
195+ @ Override
196+ public int hashCode () {
197+ return Objects .hash (namespace , workflowId , firstExecutionRunId );
198+ }
199+
200+ @ Override
201+ public String toString () {
202+ return "WorkflowChainId{"
203+ + "namespace='"
204+ + namespace
205+ + '\''
206+ + ", workflowId='"
207+ + workflowId
208+ + '\''
209+ + ", firstExecutionRunId='"
210+ + firstExecutionRunId
211+ + '\''
212+ + '}' ;
213+ }
214+ }
215+
169216 private TestWorkflowMutableState getMutableState (ExecutionId executionId , boolean failNotExists ) {
170217 lock .lock ();
171218 try {
@@ -205,6 +252,63 @@ private TestWorkflowMutableState getMutableState(WorkflowId workflowId, boolean
205252 }
206253 }
207254
255+ private TestWorkflowMutableState getMutableState (
256+ WorkflowChainId workflowChainId , boolean failNotExists ) {
257+ lock .lock ();
258+ try {
259+ TestWorkflowMutableState mutableState = executionsByFirstExecutionRunId .get (workflowChainId );
260+ if (mutableState == null && failNotExists ) {
261+ throw Status .NOT_FOUND
262+ .withDescription ("Execution not found in mutable state: " + workflowChainId )
263+ .asRuntimeException ();
264+ }
265+ return mutableState ;
266+ } finally {
267+ lock .unlock ();
268+ }
269+ }
270+
271+ private TestWorkflowMutableState getMutableState (
272+ String namespace ,
273+ WorkflowExecution execution ,
274+ String firstExecutionRunId ,
275+ boolean failNotExists ) {
276+ ExecutionId executionId = new ExecutionId (namespace , execution );
277+ WorkflowChainId workflowChainId =
278+ firstExecutionRunId .isEmpty ()
279+ ? null
280+ : new WorkflowChainId (namespace , execution .getWorkflowId (), firstExecutionRunId );
281+
282+ if (workflowChainId != null ) {
283+ TestWorkflowMutableState mutableStateByFirstRunId = getMutableState (workflowChainId , false );
284+ if (mutableStateByFirstRunId != null ) {
285+ return mutableStateByFirstRunId ;
286+ }
287+ }
288+
289+ TestWorkflowMutableState mutableState = getMutableState (executionId , false );
290+ if (mutableState != null ) {
291+ if (workflowChainId == null ) {
292+ return mutableState ;
293+ }
294+ WorkflowExecution mutableStateExecution = mutableState .getExecutionId ().getExecution ();
295+ if (mutableStateExecution .getRunId ().equals (execution .getRunId ())
296+ && firstExecutionRunId .equals (mutableState .getFirstExecutionRunId ())) {
297+ return mutableState ;
298+ }
299+ }
300+
301+ if (failNotExists ) {
302+ if (workflowChainId != null ) {
303+ throw Status .NOT_FOUND
304+ .withDescription ("Execution not found in mutable state: " + workflowChainId )
305+ .asRuntimeException ();
306+ }
307+ return getMutableState (executionId , true );
308+ }
309+ return null ;
310+ }
311+
208312 @ Override
209313 public void startWorkflowExecution (
210314 StartWorkflowExecutionRequest request ,
@@ -454,6 +558,11 @@ private StartWorkflowExecutionResponse startWorkflowExecutionNoRunningCheckLocke
454558 WorkflowExecution execution = mutableState .getExecutionId ().getExecution ();
455559 ExecutionId executionId = new ExecutionId (namespace , execution );
456560 executionsByWorkflowId .put (workflowId , mutableState );
561+ if (!firstExecutionRunId .isEmpty ()) {
562+ executionsByFirstExecutionRunId .put (
563+ new WorkflowChainId (namespace , workflowId .getWorkflowId (), firstExecutionRunId ),
564+ mutableState );
565+ }
457566 executions .put (executionId , mutableState );
458567
459568 PollWorkflowTaskQueueRequest eagerWorkflowTaskPollRequest =
@@ -1094,9 +1203,12 @@ public void requestCancelWorkflowExecution(
10941203 void requestCancelWorkflowExecution (
10951204 RequestCancelWorkflowExecutionRequest cancelRequest ,
10961205 Optional <TestWorkflowMutableStateImpl .CancelExternalWorkflowExecutionCallerInfo > callerInfo ) {
1097- ExecutionId executionId =
1098- new ExecutionId (cancelRequest .getNamespace (), cancelRequest .getWorkflowExecution ());
1099- TestWorkflowMutableState mutableState = getMutableState (executionId );
1206+ TestWorkflowMutableState mutableState =
1207+ getMutableState (
1208+ cancelRequest .getNamespace (),
1209+ cancelRequest .getWorkflowExecution (),
1210+ cancelRequest .getFirstExecutionRunId (),
1211+ true );
11001212 mutableState .requestCancelWorkflowExecution (cancelRequest , callerInfo );
11011213 }
11021214
@@ -1114,9 +1226,12 @@ public void terminateWorkflowExecution(
11141226 }
11151227
11161228 private void terminateWorkflowExecution (TerminateWorkflowExecutionRequest request ) {
1117- ExecutionId executionId =
1118- new ExecutionId (request .getNamespace (), request .getWorkflowExecution ());
1119- TestWorkflowMutableState mutableState = getMutableState (executionId );
1229+ TestWorkflowMutableState mutableState =
1230+ getMutableState (
1231+ request .getNamespace (),
1232+ request .getWorkflowExecution (),
1233+ request .getFirstExecutionRunId (),
1234+ true );
11201235 mutableState .terminateWorkflowExecution (request );
11211236 }
11221237
0 commit comments