99import com .amazonaws .lambda .durable .exception .SerDesException ;
1010import com .amazonaws .lambda .durable .exception .UnrecoverableDurableExecutionException ;
1111import com .amazonaws .lambda .durable .execution .ExecutionManager ;
12- import com .amazonaws .lambda .durable .execution .ExecutionPhase ;
1312import com .amazonaws .lambda .durable .execution .ThreadType ;
1413import com .amazonaws .lambda .durable .serde .SerDes ;
1514import com .amazonaws .lambda .durable .util .ExceptionHelper ;
1615import java .time .Duration ;
1716import java .util .Objects ;
1817import java .util .concurrent .CompletableFuture ;
19- import java .util .concurrent .Phaser ;
2018import org .slf4j .Logger ;
2119import org .slf4j .LoggerFactory ;
2220import software .amazon .awssdk .services .lambda .model .ErrorObject ;
2321import software .amazon .awssdk .services .lambda .model .Operation ;
22+ import software .amazon .awssdk .services .lambda .model .OperationStatus ;
2423import software .amazon .awssdk .services .lambda .model .OperationType ;
2524import software .amazon .awssdk .services .lambda .model .OperationUpdate ;
2625
3938 * <ul>
4039 * <li>Starting multiple async operations quickly
4140 * <li>Blocking on results later when needed
42- * <li>Proper thread coordination via Phasers
41+ * <li>Proper thread coordination via future
4342 * </ul>
4443 */
4544public abstract class BaseDurableOperation <T > implements DurableFuture <T > {
@@ -51,7 +50,7 @@ public abstract class BaseDurableOperation<T> implements DurableFuture<T> {
5150 private final ExecutionManager executionManager ;
5251 private final TypeToken <T > resultTypeToken ;
5352 private final SerDes resultSerDes ;
54- private final Phaser phaser ;
53+ private final CompletableFuture < Void > completionFuture ;
5554
5655 public BaseDurableOperation (
5756 String operationId ,
@@ -67,8 +66,10 @@ public BaseDurableOperation(
6766 this .resultTypeToken = resultTypeToken ;
6867 this .resultSerDes = resultSerDes ;
6968
70- // todo: phaser could be used only in ExecutionManager and invisible from operations.
71- this .phaser = executionManager .startPhaser (operationId );
69+ this .completionFuture = new CompletableFuture <>();
70+
71+ // register this operation in ExecutionManager so that the operation can receive updates from ExecutionManager
72+ executionManager .registerOperation (this );
7273 }
7374
7475 /** Gets the unique identifier for this operation. */
@@ -96,7 +97,7 @@ public OperationType getType() {
9697 *
9798 * <ul>
9899 * <li>Thread deregistration (allows suspension)
99- * <li>Phaser blocking (waits for operation to complete)
100+ * <li>Blocking (waits for operation to complete)
100101 * <li>Thread reactivation (resumes execution)
101102 * <li>Result retrieval
102103 * </ul>
@@ -116,7 +117,7 @@ protected Operation getOperation() {
116117 }
117118
118119 /**
119- * check if it's called from a Step.
120+ * Checks if it's called from a Step.
120121 *
121122 * @throws IllegalDurableOperationException if it's in a step
122123 */
@@ -131,43 +132,43 @@ private void validateCurrentThreadType() {
131132 }
132133 }
133134
134- // phase control utilities
135+ /** Checks if this operation is completed */
136+ protected boolean isOperationCompleted () {
137+ return completionFuture .isDone ();
138+ }
139+
140+ /** Waits for the operation to complete and suspends the execution if no active thread is running */
135141 protected Operation waitForOperationCompletion () {
136142
137143 validateCurrentThreadType ();
138144
139- // register to prevent the state from advancing
140- phaser .register ();
141-
142- // If we are in a replay where the operation is already complete (SUCCEEDED /
143- // FAILED), the Phaser will be
144- // advanced in .execute() already and we don't block but return the result
145- // immediately.
146- if (phaser .getPhase () == ExecutionPhase .RUNNING .getValue ()) {
147- // Operation not done yet
148- var context = executionManager .getCurrentContext ();
149- // Deregister current context - allows suspension
150- logger .debug (
151- "get() on {} attempting to deregister context: {}" ,
152- getType (),
153- executionManager .getCurrentContext ().contextId ());
154- deregisterActiveThreadAndUnsetCurrentContext (context .contextId ());
155-
156- // Block until operation completes
157- logger .trace ("Waiting for operation to finish {} (Phaser: {})" , getOperationId (), phaser );
158- phaser .arriveAndAwaitAdvance ();
159-
160- // Reactivate current context
161- registerActiveThread (context .contextId (), context .threadType ());
162- setCurrentContext (context .contextId (), context .threadType ());
163-
164- // Complete phase 1
165- phaser .arriveAndDeregister ();
166- } else {
167- // The phaser is already completed. Deregister now.
168- phaser .arriveAndDeregister ();
145+ var context = executionManager .getCurrentContext ();
146+
147+ // Use a synchronized block here to prevent the completionFuture from being completed by the execution thread
148+ // (a step or child context thread) when it's inside the `if` block where the completion check is done (not
149+ // completed) while the callback isn't added to the completionFuture or the current thread isn't deregistered.
150+ synchronized (completionFuture ) {
151+ if (!isOperationCompleted ()) {
152+ // Operation not done yet
153+ logger .debug ("get() on {} attempting to deregister context: {}" , getType (), context .contextId ());
154+
155+ // Add a callback to completionFuture so that when the completionFuture is completed,
156+ // it will register the current Context thread synchronously to make sure it is always registered
157+ // before the execution thread (Step or child context) is deregistered.
158+ completionFuture .thenRun (() -> registerActiveThread (context .contextId (), context .threadType ()));
159+
160+ // Deregister the current thread to allow suspension
161+ deregisterActiveThreadAndUnsetCurrentContext (context .contextId ());
162+ }
169163 }
170164
165+ // Block until operation completes. No-op if the future is already completed.
166+ logger .trace ("Waiting for operation to finish {} ({})" , getOperationId (), completionFuture );
167+ completionFuture .join ();
168+
169+ // Reactivate current context. No-op if this is called twice.
170+ setCurrentContext (context .contextId (), context .threadType ());
171+
171172 // Get result based on status
172173 var op = getOperation ();
173174 if (op == null ) {
@@ -177,13 +178,27 @@ protected Operation waitForOperationCompletion() {
177178 return op ;
178179 }
179180
181+ /** Receives operation updates from ExecutionManager and updates the internal state of the operation */
182+ public void onCheckpointComplete (Operation operation ) {
183+ if (isTerminalStatus (operation .status ())) {
184+ synchronized (completionFuture ) {
185+ logger .trace ("In onCheckpointComplete, completing operation {} ({})" , operationId , completionFuture );
186+ completionFuture .complete (null );
187+ }
188+ }
189+ }
190+
191+ /** Marks the operation as already completed (in replay). */
180192 protected void markAlreadyCompleted () {
181- // Operation is already completed in a relay. We advance and deregister from the Phaser
182- // so that get method doesn't block and returns the result immediately.
183- logger .trace ("Detected terminal status during replay. Advancing phaser 0 -> 1 {}." , phaser );
184- phaser .arriveAndDeregister (); // Phase 0 -> 1
193+ // When the operation is already completed in a replay, we complete completionFuture immediately
194+ // so that the `get` method will be unblocked and the context thread will be registered
195+ logger .trace ("In markAlreadyCompleted, completing operation: {} ({})." , operationId , completionFuture );
196+ synchronized (completionFuture ) {
197+ completionFuture .complete (null );
198+ }
185199 }
186200
201+ // terminate the execution
187202 protected T terminateExecution (UnrecoverableDurableExecutionException exception ) {
188203 executionManager .terminateExecution (exception );
189204 // Exception is already thrown from above. Keep the throw statement below to make tests happy
@@ -194,7 +209,7 @@ protected T terminateExecutionWithIllegalDurableOperationException(String messag
194209 return terminateExecution (new IllegalDurableOperationException (message ));
195210 }
196211
197- // advanced phase control used by Step only
212+ // advanced thread and context control
198213 protected void deregisterActiveThreadAndUnsetCurrentContext (String threadId ) {
199214 executionManager .deregisterActiveThreadAndUnsetCurrentContext (threadId );
200215 }
@@ -298,4 +313,12 @@ protected void validateReplay(Operation checkpointed) {
298313 operationId , checkpointed .name (), getName ())));
299314 }
300315 }
316+
317+ private boolean isTerminalStatus (OperationStatus status ) {
318+ return status == OperationStatus .SUCCEEDED
319+ || status == OperationStatus .FAILED
320+ || status == OperationStatus .CANCELLED
321+ || status == OperationStatus .TIMED_OUT
322+ || status == OperationStatus .STOPPED ;
323+ }
301324}
0 commit comments