@@ -72,6 +72,7 @@ CompletableFuture<Operation> pollForUpdate(String operationId) {
7272 logger .debug ("Polling request received: operation id {}" , operationId );
7373 var future = new CompletableFuture <Operation >();
7474 synchronized (pollingFutures ) {
75+ // register the future in pollingFutures, which will be completed by the polling thread
7576 pollingFutures
7677 .computeIfAbsent (operationId , k -> Collections .synchronizedList (new ArrayList <>()))
7778 .add (future );
@@ -120,25 +121,28 @@ private void processQueue() {
120121 }
121122
122123 protected CompletableFuture <Void > doBatchAction (List <OperationUpdate > updates ) {
123- logger .debug ("Calling durable API checkpointDurableExecution with {} updates" , updates .size ());
124- var response = client .checkpoint (durableExecutionArn , checkpointToken , updates );
125- logger .debug ("Durable API checkpointDurableExecution called: {}." , response );
124+ // doBatchAction will be called from the polling thread and also from AsyncBatcher.
125+ // Use synchronized here to make sure no concurrent checkpoint API calls
126+ synchronized (pollingFutures ) {
127+ logger .debug ("Calling durable API checkpointDurableExecution with {} updates" , updates .size ());
128+ var response = client .checkpoint (durableExecutionArn , checkpointToken , updates );
129+ logger .debug ("Durable API checkpointDurableExecution called: {}." , response );
126130
127- // Notify callback of completion
128- // TODO: sam local backend returns no new execution state when called with zero
129- // updates. WHY?
130- // This means the polling will never receive an operation update and complete
131- // the Phaser.
132- checkpointToken = response .checkpointToken ();
133- if (response .newExecutionState () != null ) {
134- var operations = fetchAllPages (
135- response .newExecutionState ().operations (),
136- response .newExecutionState ().nextMarker ());
137- if (!operations .isEmpty ()) {
138- callback .accept (operations );
139- }
131+ // Notify callback of completion
132+ // TODO: sam local backend returns no new execution state when called with zero
133+ // updates. WHY?
134+ // This means the polling will never receive an operation update and complete
135+ // the Phaser.
136+ checkpointToken = response .checkpointToken ();
137+ if (response .newExecutionState () != null ) {
138+ var operations = fetchAllPages (
139+ response .newExecutionState ().operations (),
140+ response .newExecutionState ().nextMarker ());
141+ if (!operations .isEmpty ()) {
142+ callback .accept (operations );
143+ }
140144
141- synchronized ( pollingFutures ) {
145+ // complete the registered pollingFutures
142146 for (var operation : operations ) {
143147 var pollers = pollingFutures .remove (operation .id ());
144148 if (pollers != null ) {
@@ -150,7 +154,7 @@ protected CompletableFuture<Void> doBatchAction(List<OperationUpdate> updates) {
150154 return CompletableFuture .completedFuture (null );
151155 }
152156
153- public static int estimateSize (OperationUpdate update ) {
157+ private static int estimateSize (OperationUpdate update ) {
154158 if (update == null ) {
155159 return 0 ;
156160 }
0 commit comments