1616import java .util .concurrent .Executor ;
1717import java .util .concurrent .Executors ;
1818import java .util .concurrent .Phaser ;
19+ import java .util .concurrent .atomic .AtomicReference ;
1920import org .slf4j .Logger ;
2021import org .slf4j .LoggerFactory ;
2122import software .amazon .awssdk .services .lambda .model .Operation ;
@@ -46,10 +47,10 @@ public class ExecutionManager {
4647 private final String executionOperationId ;
4748 private volatile String checkpointToken ;
4849 private final String durableExecutionArn ;
49- private volatile ExecutionMode executionMode ;
50+ private final AtomicReference < ExecutionMode > executionMode ;
5051
5152 // ===== Current Operation Context (for logging) =====
52- private static final ThreadLocal <OperationContext > currentOperation = new ThreadLocal <>();
53+ private static final ThreadLocal <OperationContext > currentOperation = new InheritableThreadLocal <>();
5354
5455 // ===== Thread Coordination =====
5556 private final Map <String , ThreadType > activeThreads = Collections .synchronizedMap (new HashMap <>());
@@ -76,7 +77,7 @@ public ExecutionManager(
7677 loadAllOperations (initialExecutionState );
7778
7879 // Start in REPLAY mode if we have more than just the initial EXECUTION operation
79- this .executionMode = operations .size () > 1 ? ExecutionMode .REPLAY : ExecutionMode .EXECUTION ;
80+ this .executionMode = new AtomicReference <>( operations .size () > 1 ? ExecutionMode .REPLAY : ExecutionMode .EXECUTION ) ;
8081
8182 this .managedExecutor = executor ;
8283
@@ -114,7 +115,7 @@ public String getDurableExecutionArn() {
114115 }
115116
116117 public boolean isReplaying () {
117- return executionMode == ExecutionMode .REPLAY ;
118+ return executionMode . get () == ExecutionMode .REPLAY ;
118119 }
119120
120121 public String getCheckpointToken () {
@@ -152,9 +153,10 @@ public Operation getOperation(String operationId) {
152153 */
153154 public Operation getOperationAndUpdateReplayState (String operationId ) {
154155 var existing = operations .get (operationId );
155- if (executionMode == ExecutionMode .REPLAY && existing == null ) {
156- executionMode = ExecutionMode .EXECUTION ;
157- logger .debug ("Transitioned to EXECUTION mode at operation '{}'" , operationId );
156+ if (existing == null ) {
157+ if (executionMode .compareAndSet (ExecutionMode .REPLAY , ExecutionMode .EXECUTION )) {
158+ logger .debug ("Transitioned to EXECUTION mode at operation '{}'" , operationId );
159+ }
158160 }
159161 return existing ;
160162 }
@@ -337,6 +339,7 @@ public CompletableFuture<Void> getSuspendExecutionFuture() {
337339
338340 public void shutdown () {
339341 checkpointBatcher .shutdown ();
342+ currentOperation .remove ();
340343 }
341344
342345 private boolean isTerminalStatus (OperationStatus status ) {
0 commit comments