1919import software .amazon .awssdk .services .lambda .model .OperationAction ;
2020import software .amazon .awssdk .services .lambda .model .OperationStatus ;
2121import software .amazon .awssdk .services .lambda .model .OperationType ;
22+ import software .amazon .awssdk .services .lambda .model .OperationUpdate ;
2223import software .amazon .lambda .durable .DurableConfig ;
2324import software .amazon .lambda .durable .TestUtils ;
2425import software .amazon .lambda .durable .TypeToken ;
@@ -44,14 +45,21 @@ class ParallelOperationTest {
4445
4546 private DurableContextImpl durableContext ;
4647 private ExecutionManager executionManager ;
48+ // Thread-safe backing store for getOperationAndUpdateReplayState.
49+ // Tests pre-populate this; doAnswer writes here before firing onCheckpointComplete,
50+ // guaranteeing visibility to any thread that reads after the future unblocks.
51+ private ConcurrentHashMap <String , Operation > operationStore ;
4752
4853 @ BeforeEach
4954 void setUp () {
5055 durableContext = mock (DurableContextImpl .class );
5156 executionManager = mock (ExecutionManager .class );
57+ operationStore = new ConcurrentHashMap <>();
5258
5359 when (executionManager .getCurrentThreadContext ()).thenReturn (new ThreadContext (null , ThreadType .CONTEXT ));
54- when (executionManager .getOperationAndUpdateReplayState (anyString ())).thenReturn (null );
60+ // Delegate to operationStore so all reads see the latest write, regardless of thread.
61+ when (executionManager .getOperationAndUpdateReplayState (anyString ()))
62+ .thenAnswer (inv -> operationStore .get (inv .getArgument (0 )));
5563
5664 var childContext = mock (DurableContextImpl .class );
5765 when (childContext .getExecutionManager ()).thenReturn (executionManager );
@@ -77,9 +85,10 @@ void setUp() {
7785 .when (executionManager )
7886 .registerOperation (any ());
7987
80- // Simulate the real backend for all sendOperationUpdate calls:
81- // - For SUCCEED on the parallel op: update the stub and fire onCheckpointComplete to unblock join().
82- // - For everything else (START, child checkpoints): just return a completed future.
88+ // Simulate the real backend for all sendOperationUpdate calls.
89+ // For SUCCEED on the parallel op: write to operationStore first (establishes happens-before
90+ // via ConcurrentHashMap's volatile semantics), then fire onCheckpointComplete to unblock join().
91+ // This ordering guarantees getOperationAndUpdateReplayState() never returns null after unblocking.
8392 var succeededParallelOp = Operation .builder ()
8493 .id (OPERATION_ID )
8594 .name ("test-parallel" )
@@ -88,11 +97,11 @@ void setUp() {
8897 .status (OperationStatus .SUCCEEDED )
8998 .build ();
9099 doAnswer (inv -> {
91- var update = (software . amazon . awssdk . services . lambda . model . OperationUpdate ) inv .getArgument (0 );
100+ var update = (OperationUpdate ) inv .getArgument (0 );
92101
93102 if (OPERATION_ID .equals (update .id ()) && update .action () == OperationAction .SUCCEED ) {
94- when ( executionManager . getOperationAndUpdateReplayState ( OPERATION_ID ))
95- . thenReturn ( succeededParallelOp );
103+ // Write before completing the future — ConcurrentHashMap guarantees visibility.
104+ operationStore . put ( OPERATION_ID , succeededParallelOp );
96105 var op = registeredOps .get (OPERATION_ID );
97106 if (op != null ) {
98107 op .onCheckpointComplete (succeededParallelOp );
0 commit comments