@@ -51,20 +51,21 @@ class CheckpointBatcher {
5151 this .callback = callback ;
5252 this .checkpointToken = checkpointToken ;
5353 this .checkpointApiRequestBatcher = new ApiRequestBatcher <>(
54- MAX_ITEM_COUNT , MAX_BATCH_SIZE_BYTES , CheckpointBatcher ::estimateSize , this ::doBatchAction );
54+ MAX_ITEM_COUNT , MAX_BATCH_SIZE_BYTES , CheckpointBatcher ::estimateSize , this ::checkpointBatch );
5555 }
5656
57+ /** Queues a checkpoint request for batched execution */
5758 CompletableFuture <Void > checkpoint (OperationUpdate update ) {
5859 logger .debug ("Checkpoint request received: Action {}" , update .action ());
5960 return checkpointApiRequestBatcher .submit (update , config .getCheckpointDelay ());
6061 }
6162
62- /** Poll for updates of the specified operation with preconfigured intervals */
63+ /** Polls for updates of the specified operation with preconfigured intervals */
6364 CompletableFuture <Operation > pollForUpdate (String operationId ) {
6465 return pollForUpdate (operationId , config .getPollingInterval ());
6566 }
6667
67- /** Poll for updates of the specified operation with specified delay */
68+ /** Polls for updates of the specified operation with specified delay */
6869 CompletableFuture <Operation > pollForUpdate (String operationId , Duration delay ) {
6970 logger .debug ("Polling request received: operation id {}" , operationId );
7071 var future = new CompletableFuture <Operation >();
@@ -83,6 +84,7 @@ CompletableFuture<Operation> pollForUpdate(String operationId, Duration delay) {
8384 return future ;
8485 }
8586
87+ /** Cancels all polling futures and waits for all pending checkpoint requests to complete */
8688 void shutdown () {
8789 // complete all polling futures with an exception
8890 List <List <CompletableFuture <Operation >>> allFutures ;
@@ -99,7 +101,10 @@ void shutdown() {
99101 checkpointApiRequestBatcher .shutdown ();
100102 }
101103
102- /** Calling GetExecutionState API to get all pages of operations given the nextMarker */
104+ /**
105+ * Calling GetExecutionState API to get all pages of operations given CheckpointUpdatedExecutionState(operations,
106+ * nextMarker)
107+ */
103108 public List <Operation > fetchAllPages (CheckpointUpdatedExecutionState checkpointUpdatedExecutionState ) {
104109 List <Operation > operations = new ArrayList <>();
105110 if (checkpointUpdatedExecutionState == null ) {
@@ -123,8 +128,7 @@ public List<Operation> fetchAllPages(CheckpointUpdatedExecutionState checkpointU
123128 return operations ;
124129 }
125130
126- private void doBatchAction (List <OperationUpdate > updates ) {
127- // doBatchAction can be called concurrently from ApiRequestBatcher.
131+ private void checkpointBatch (List <OperationUpdate > updates ) {
128132 synchronized (pollingFutures ) {
129133 // filter the null values from pollers
130134 var request = updates .stream ().filter (Objects ::nonNull ).toList ();
0 commit comments