2121import software .amazon .lambda .durable .TypeToken ;
2222import software .amazon .lambda .durable .config .RunInChildContextConfig ;
2323import software .amazon .lambda .durable .context .DurableContextImpl ;
24+ import software .amazon .lambda .durable .exception .UnrecoverableDurableExecutionException ;
2425import software .amazon .lambda .durable .execution .OperationIdGenerator ;
26+ import software .amazon .lambda .durable .execution .SuspendExecutionException ;
2527import software .amazon .lambda .durable .execution .ThreadType ;
2628import software .amazon .lambda .durable .model .ConcurrencyCompletionStatus ;
2729import software .amazon .lambda .durable .model .OperationIdentifier ;
2830import software .amazon .lambda .durable .model .OperationSubType ;
2931import software .amazon .lambda .durable .serde .SerDes ;
32+ import software .amazon .lambda .durable .util .ExceptionHelper ;
3033
3134/**
3235 * Abstract base class for concurrent execution of multiple child context operations.
@@ -143,7 +146,7 @@ protected <R> ChildContextOperation<R> enqueueItem(
143146 }
144147
145148 private void notifyConsumerThread () {
146- synchronized (this ) {
149+ synchronized (completionFuture ) {
147150 consumerThreadListener .get ().complete (null );
148151 }
149152 }
@@ -156,61 +159,80 @@ protected void executeItems() {
156159 AtomicInteger failedCount = new AtomicInteger (0 );
157160
158161 Runnable consumer = () -> {
159- while (true ) {
160- // Set a new future if it's completed so that it will be able to receive a notification of
161- // new items when the thread is checking completion condition and processing
162- // the queued items below.
163- synchronized (this ) {
164- if (consumerThreadListener .get () != null
165- && consumerThreadListener .get ().isDone ()) {
166- consumerThreadListener .set (new CompletableFuture <>());
162+ try {
163+ while (true ) {
164+ // Set a new future if it's completed so that it will be able to receive a notification of
165+ // new items when the thread is checking completion condition and processing
166+ // the queued items below.
167+ synchronized (completionFuture ) {
168+ if (consumerThreadListener .get () != null
169+ && consumerThreadListener .get ().isDone ()) {
170+ consumerThreadListener .set (new CompletableFuture <>());
171+ }
167172 }
168- }
169173
170- // Process completion condition. Quit the loop if the condition is met.
171- if (isOperationCompleted ()) {
172- return ;
173- }
174- var completionStatus = canComplete (succeededCount , failedCount , runningChildren );
175- if (completionStatus != null ) {
176- handleCompletion (completionStatus );
177- return ;
178- }
174+ // Process completion condition. Quit the loop if the condition is met.
175+ if (isOperationCompleted ()) {
176+ return ;
177+ }
178+ var completionStatus = canComplete (succeededCount , failedCount , runningChildren );
179+ if (completionStatus != null ) {
180+ handleCompletion (completionStatus );
181+ return ;
182+ }
179183
180- // process new items in the queue
181- while (runningChildren .size () < maxConcurrency && !pendingQueue .isEmpty ()) {
182- var next = pendingQueue .poll ();
183- runningChildren .add (next );
184- logger .debug ("Executing operation {}" , next .getName ());
185- next .execute ();
186- }
184+ // process new items in the queue
185+ while (runningChildren .size () < maxConcurrency && !pendingQueue .isEmpty ()) {
186+ var next = pendingQueue .poll ();
187+ runningChildren .add (next );
188+ logger .debug ("Executing operation {}" , next .getName ());
189+ next .execute ();
190+ }
187191
188- // If consumerThreadListener has been completed when processing above, waitForChildCompletion will
189- // immediately return null and repeat the above again
190- var child = waitForChildCompletion (succeededCount , failedCount , runningChildren );
191-
192- // child may be null if the consumer thread is woken up due to new items added or completion condition
193- // changed
194- if (child != null ) {
195- if (runningChildren .contains (child )) {
196- runningChildren .remove (child );
197- onItemComplete (succeededCount , failedCount , (ChildContextOperation <?>) child );
198- } else {
199- throw new IllegalStateException ("Unexpected completion: " + child );
192+ // If consumerThreadListener has been completed when processing above, waitForChildCompletion will
193+ // immediately return null and repeat the above again
194+ var child = waitForChildCompletion (succeededCount , failedCount , runningChildren );
195+
196+ // child may be null if the consumer thread is woken up due to new items added or completion
197+ // condition
198+ // changed
199+ if (child != null ) {
200+ if (runningChildren .contains (child )) {
201+ runningChildren .remove (child );
202+ onItemComplete (succeededCount , failedCount , (ChildContextOperation <?>) child );
203+ } else {
204+ throw new IllegalStateException ("Unexpected completion: " + child );
205+ }
200206 }
201207 }
208+ } catch (Throwable ex ) {
209+ handleException (ex );
202210 }
203211 };
204212 // run consumer in the user thread pool, although it's not a real user thread
205213 runUserHandler (consumer , getOperationId (), ThreadType .CONTEXT );
206214 }
207215
216+ private void handleException (Throwable ex ) {
217+ Throwable throwable = ExceptionHelper .unwrapCompletableFuture (ex );
218+ if (throwable instanceof SuspendExecutionException suspendExecutionException ) {
219+ // Rethrow Error immediately — do not checkpoint
220+ throw suspendExecutionException ;
221+ }
222+ if (throwable instanceof UnrecoverableDurableExecutionException unrecoverableDurableExecutionException ) {
223+ throw terminateExecution (unrecoverableDurableExecutionException );
224+ }
225+
226+ throw terminateExecutionWithIllegalDurableOperationException (
227+ String .format ("Unexpected exception in concurrency operation: %s" , throwable ));
228+ }
229+
208230 private BaseDurableOperation waitForChildCompletion (
209231 AtomicInteger succeededCount , AtomicInteger failedCount , Set <BaseDurableOperation > runningChildren ) {
210232 var threadContext = getCurrentThreadContext ();
211233 CompletableFuture <Object > future ;
212234
213- synchronized (this ) {
235+ synchronized (completionFuture ) {
214236 // check again in synchronized block to prevent race conditions
215237 if (isOperationCompleted ()) {
216238 return null ;
0 commit comments