99import com .amazonaws .lambda .durable .exception .SerDesException ;
1010import com .amazonaws .lambda .durable .exception .UnrecoverableDurableExecutionException ;
1111import com .amazonaws .lambda .durable .execution .ExecutionManager ;
12- import com .amazonaws .lambda .durable .execution .ExecutionPhase ;
1312import com .amazonaws .lambda .durable .execution .ThreadType ;
1413import com .amazonaws .lambda .durable .serde .SerDes ;
1514import com .amazonaws .lambda .durable .util .ExceptionHelper ;
1615import java .time .Duration ;
1716import java .util .Objects ;
1817import java .util .concurrent .CompletableFuture ;
19- import java .util .concurrent .Phaser ;
2018import org .slf4j .Logger ;
2119import org .slf4j .LoggerFactory ;
2220import software .amazon .awssdk .services .lambda .model .ErrorObject ;
2321import software .amazon .awssdk .services .lambda .model .Operation ;
22+ import software .amazon .awssdk .services .lambda .model .OperationStatus ;
2423import software .amazon .awssdk .services .lambda .model .OperationType ;
2524import software .amazon .awssdk .services .lambda .model .OperationUpdate ;
2625
3938 * <ul>
4039 * <li>Starting multiple async operations quickly
4140 * <li>Blocking on results later when needed
42- * <li>Proper thread coordination via Phasers
41+ * <li>Proper thread coordination via future
4342 * </ul>
4443 */
4544public abstract class BaseDurableOperation <T > implements DurableFuture <T > {
@@ -51,7 +50,7 @@ public abstract class BaseDurableOperation<T> implements DurableFuture<T> {
5150 private final ExecutionManager executionManager ;
5251 private final TypeToken <T > resultTypeToken ;
5352 private final SerDes resultSerDes ;
54- private final Phaser phaser ;
53+ private final CompletableFuture < Void > completionFuture ;
5554
5655 public BaseDurableOperation (
5756 String operationId ,
@@ -67,8 +66,10 @@ public BaseDurableOperation(
6766 this .resultTypeToken = resultTypeToken ;
6867 this .resultSerDes = resultSerDes ;
6968
70- // todo: phaser could be used only in ExecutionManager and invisible from operations.
71- this .phaser = executionManager .startPhaser (operationId );
69+ this .completionFuture = new CompletableFuture <>();
70+
71+ // register this operation in ExecutionManager so that the operation can receive updates from ExecutionManager
72+ executionManager .registerOperation (this );
7273 }
7374
7475 /** Gets the unique identifier for this operation. */
@@ -96,7 +97,7 @@ public OperationType getType() {
9697 *
9798 * <ul>
9899 * <li>Thread deregistration (allows suspension)
99- * <li>Phaser blocking (waits for operation to complete)
100+ * <li>Blocking (waits for operation to complete)
100101 * <li>Thread reactivation (resumes execution)
101102 * <li>Result retrieval
102103 * </ul>
@@ -116,7 +117,7 @@ protected Operation getOperation() {
116117 }
117118
118119 /**
119- * check if it's called from a Step.
120+ * Checks if it's called from a Step.
120121 *
121122 * @throws IllegalDurableOperationException if it's in a step
122123 */
@@ -131,43 +132,39 @@ private void validateCurrentThreadType() {
131132 }
132133 }
133134
134- // phase control utilities
135+ /** Checks if this operation is completed */
136+ protected boolean isOperationCompleted () {
137+ return completionFuture .isDone ();
138+ }
139+
140+ /** Waits for the operation to complete and suspends the execution if no active thread is running */
135141 protected Operation waitForOperationCompletion () {
136142
137143 validateCurrentThreadType ();
138144
139- // register to prevent the state from advancing
140- phaser .register ();
141-
142- // If we are in a replay where the operation is already complete (SUCCEEDED /
143- // FAILED), the Phaser will be
144- // advanced in .execute() already and we don't block but return the result
145- // immediately.
146- if (phaser .getPhase () == ExecutionPhase .RUNNING .getValue ()) {
147- // Operation not done yet
148- var context = executionManager .getCurrentContext ();
149- // Deregister current context - allows suspension
150- logger .debug (
151- "get() on {} attempting to deregister context: {}" ,
152- getType (),
153- executionManager .getCurrentContext ().contextId ());
154- deregisterActiveThreadAndUnsetCurrentContext (context .contextId ());
155-
156- // Block until operation completes
157- logger .trace ("Waiting for operation to finish {} (Phaser: {})" , getOperationId (), phaser );
158- phaser .arriveAndAwaitAdvance ();
159-
160- // Reactivate current context
161- registerActiveThread (context .contextId (), context .threadType ());
162- setCurrentContext (context .contextId (), context .threadType ());
163-
164- // Complete phase 1
165- phaser .arriveAndDeregister ();
166- } else {
167- // The phaser is already completed. Deregister now.
168- phaser .arriveAndDeregister ();
145+ var context = executionManager .getCurrentContext ();
146+ synchronized (completionFuture ) {
147+ if (!isOperationCompleted ()) {
148+ // Operation not done yet
149+ logger .debug ("get() on {} attempting to deregister context: {}" , getType (), context .contextId ());
150+
151+ // Add a callback to completionFuture so that when the completionFuture is completed,
152+ // register the current Context thread synchronously to make sure it is always registered
153+ // before the execution thread (Step or child context) is deregistered
154+ completionFuture .thenRun (() -> registerActiveThread (context .contextId (), context .threadType ()));
155+
156+ // Deregister the current thread to allow suspension
157+ deregisterActiveThreadAndUnsetCurrentContext (context .contextId ());
158+ }
169159 }
170160
161+ // Block until operation completes. No-op if the future is already completed.
162+ logger .trace ("Waiting for operation to finish {} ({})" , getOperationId (), completionFuture );
163+ completionFuture .join ();
164+
165+ // Reactivate current context. No-op if this is called twice.
166+ setCurrentContext (context .contextId (), context .threadType ());
167+
171168 // Get result based on status
172169 var op = getOperation ();
173170 if (op == null ) {
@@ -177,13 +174,27 @@ protected Operation waitForOperationCompletion() {
177174 return op ;
178175 }
179176
177+ /** Receives operation updates from ExecutionManager and updates the internal state of the operation */
178+ public void onCheckpointComplete (Operation operation ) {
179+ if (isTerminalStatus (operation .status ())) {
180+ synchronized (completionFuture ) {
181+ logger .trace ("In onCheckpointComplete, completing operation {} ({})" , operationId , completionFuture );
182+ completionFuture .complete (null );
183+ }
184+ }
185+ }
186+
187+ /** Marks the operation as already completed (in replay). */
180188 protected void markAlreadyCompleted () {
181- // Operation is already completed in a relay. We advance and deregister from the Phaser
182- // so that get method doesn't block and returns the result immediately.
183- logger .trace ("Detected terminal status during replay. Advancing phaser 0 -> 1 {}." , phaser );
184- phaser .arriveAndDeregister (); // Phase 0 -> 1
189+ // When the operation is already completed in a replay, we complete completionFuture immediately
190+ // so that the `get` method will be unblocked and the context thread will be registered
191+ logger .trace ("In markAlreadyCompleted, completing operation: {} ({})." , operationId , completionFuture );
192+ synchronized (completionFuture ) {
193+ completionFuture .complete (null );
194+ }
185195 }
186196
197+ // terminate the execution
187198 protected T terminateExecution (UnrecoverableDurableExecutionException exception ) {
188199 executionManager .terminateExecution (exception );
189200 // Exception is already thrown from above. Keep the throw statement below to make tests happy
@@ -194,7 +205,7 @@ protected T terminateExecutionWithIllegalDurableOperationException(String messag
194205 return terminateExecution (new IllegalDurableOperationException (message ));
195206 }
196207
197- // advanced phase control used by Step only
208+ // advanced thread and context control
198209 protected void deregisterActiveThreadAndUnsetCurrentContext (String threadId ) {
199210 executionManager .deregisterActiveThreadAndUnsetCurrentContext (threadId );
200211 }
@@ -298,4 +309,12 @@ protected void validateReplay(Operation checkpointed) {
298309 operationId , checkpointed .name (), getName ())));
299310 }
300311 }
312+
313+ private boolean isTerminalStatus (OperationStatus status ) {
314+ return status == OperationStatus .SUCCEEDED
315+ || status == OperationStatus .FAILED
316+ || status == OperationStatus .CANCELLED
317+ || status == OperationStatus .TIMED_OUT
318+ || status == OperationStatus .STOPPED ;
319+ }
301320}
0 commit comments