@@ -84,9 +84,6 @@ CompletableFuture<Operation> pollForUpdate(String operationId, Duration delay) {
8484 }
8585
8686 void shutdown () {
87- // wait for all checkpoint requests to complete
88- checkpointApiRequestBatcher .shutdown ();
89-
9087 // complete all polling futures with an exception
9188 List <List <CompletableFuture <Operation >>> allFutures ;
9289 synchronized (pollingFutures ) {
@@ -97,6 +94,9 @@ void shutdown() {
9794 for (var futures : allFutures ) {
9895 futures .forEach (f -> f .completeExceptionally (new IllegalStateException ("CheckpointManager shutdown" )));
9996 }
97+
98+ // wait for all checkpoint requests to complete
99+ checkpointApiRequestBatcher .shutdown ();
100100 }
101101
102102 /** Calling GetExecutionState API to get all pages of operations given the nextMarker */
@@ -122,13 +122,14 @@ public List<Operation> fetchAllPages(CheckpointUpdatedExecutionState checkpointU
122122 private void doBatchAction (List <OperationUpdate > updates ) {
123123 // doBatchAction can be called concurrently from ApiRequestBatcher.
124124 synchronized (pollingFutures ) {
125- if (pollingFutures .isEmpty () && updates .isEmpty ()) {
126- return ;
127- }
128-
129125 // filter the null values from pollers
130126 var request = updates .stream ().filter (Objects ::nonNull ).toList ();
131127
128+ if (pollingFutures .isEmpty () && request .isEmpty ()) {
129+ // ignore the batch if no pollers and no data to checkpoint
130+ return ;
131+ }
132+
132133 logger .debug ("Calling durable API checkpointDurableExecution with {} updates" , request .size ());
133134 var response = config .getDurableExecutionClient ().checkpoint (durableExecutionArn , checkpointToken , request );
134135 logger .debug ("Durable API checkpointDurableExecution called: {}." , response );
0 commit comments