1212import org .opensearch .dataprepper .model .record .Record ;
1313import org .opensearch .dataprepper .model .source .coordinator .enhanced .EnhancedSourceCoordinator ;
1414import org .opensearch .dataprepper .plugins .source .source_crawler .coordination .partition .LeaderPartition ;
15+ import org .opensearch .dataprepper .plugins .source .source_crawler .coordination .partition .SaasSourcePartition ;
16+ import org .opensearch .dataprepper .plugins .source .source_crawler .coordination .state .PaginationCrawlerWorkerProgressState ;
1517import org .opensearch .dataprepper .plugins .source .source_crawler .coordination .state .TokenPaginationCrawlerLeaderProgressState ;
1618import org .opensearch .dataprepper .plugins .source .source_crawler .model .ItemInfo ;
1719import org .slf4j .Logger ;
2527import java .util .List ;
2628import java .util .concurrent .TimeUnit ;
2729import java .util .concurrent .atomic .AtomicBoolean ;
30+ import java .util .stream .Collectors ;
2831
2932@ Named
3033public class LeaderOnlyTokenCrawler implements Crawler <SaasWorkerProgressState > {
@@ -54,7 +57,6 @@ public class LeaderOnlyTokenCrawler implements Crawler<SaasWorkerProgressState>
5457 private final Timer bufferWriteTimer ;
5558
5659 private String lastToken ;
57- private boolean shouldStopCrawl = false ;
5860 private Duration noAckTimeout ;
5961
6062 public LeaderOnlyTokenCrawler (
@@ -73,7 +75,6 @@ public LeaderOnlyTokenCrawler(
7375 @ Override
7476 public Instant crawl (LeaderPartition leaderPartition ,
7577 EnhancedSourceCoordinator coordinator ) {
76- shouldStopCrawl = false ;
7778 long startTime = System .currentTimeMillis ();
7879 Instant lastCheckpointTime = Instant .now ();
7980 TokenPaginationCrawlerLeaderProgressState leaderProgressState =
@@ -84,7 +85,7 @@ public Instant crawl(LeaderPartition leaderPartition,
8485
8586 Iterator <ItemInfo > itemIterator = ((LeaderOnlyTokenCrawlerClient ) client ).listItems (lastToken );
8687
87- while (itemIterator .hasNext () && ! shouldStopCrawl ) {
88+ while (itemIterator .hasNext ()) {
8889 List <ItemInfo > batch = collectBatch (itemIterator );
8990 if (batch .isEmpty ()) {
9091 continue ;
@@ -148,12 +149,12 @@ private void processBatch(List<ItemInfo> batch,
148149 if (success ) {
149150 // On success: update checkpoint
150151 acknowledgementSetSuccesses .increment ();
151- updateLeaderProgressState (leaderPartition , lastToken , coordinator );
152152 } else {
153- // On failure: Stop the crawl
153+ // On failure: Create a retry partition
154154 acknowledgementSetFailures .increment ();
155- log .warn ("Batch processing received negative acknowledgment for token: {}. Stopping current crawl." , lastToken );
156- shouldStopCrawl = true ;
155+ log .warn ("Batch processing received negative acknowledgment for token: {}. Creating retry " +
156+ "partition." , lastToken );
157+ createRetryPartition (batch , coordinator );
157158 }
158159 },
159160 noAckTimeout
@@ -172,17 +173,19 @@ private void processBatch(List<ItemInfo> batch,
172173
173174 if (!ackWaitDuration .minus (noAckTimeout ).isNegative ()) {
174175 // No ack received within NO_ACK_TIME_OUT_SECONDS
175- log .warn ("Acknowledgment not received for batch with token {} past wait time. Stopping current crawl ." , lastToken );
176- shouldStopCrawl = true ;
176+ log .warn ("No acknowledgment received for batch with token: {}. Creating retry partition ." , lastToken );
177+ createRetryPartition ( batch , coordinator ) ;
177178 break ;
178179 }
179180 }
181+ updateLeaderProgressState (leaderPartition , lastToken , coordinator );
180182 } catch (InterruptedException e ) {
181183 Thread .currentThread ().interrupt ();
182184 throw new RuntimeException ("Interrupted while waiting for acknowledgment" , e );
183185 } catch (Exception e ) {
184186 log .error ("Failed to process batch ending with token {}" , lastToken , e );
185187 acknowledgementSet .complete ();
188+ createRetryPartition (batch , coordinator );
186189 throw e ;
187190 }
188191 });
@@ -201,6 +204,23 @@ private void processBatch(List<ItemInfo> batch,
201204 }
202205 }
203206
207+ private void createRetryPartition (List <ItemInfo > itemInfoList , EnhancedSourceCoordinator coordinator ) {
208+ if (itemInfoList .isEmpty ()) {
209+ return ;
210+ }
211+ ItemInfo itemInfo = itemInfoList .get (0 );
212+ String partitionKey = itemInfo .getPartitionKey ();
213+ List <String > itemIds = itemInfoList .stream ().map (ItemInfo ::getId ).collect (Collectors .toList ());
214+ PaginationCrawlerWorkerProgressState state = new PaginationCrawlerWorkerProgressState ();
215+ state .setKeyAttributes (itemInfo .getKeyAttributes ());
216+ state .setItemIds (itemIds );
217+ state .setExportStartTime (Instant .now ());
218+ state .setLoadedItems (itemInfoList .size ());
219+ SaasSourcePartition sourcePartition = new SaasSourcePartition (state , partitionKey );
220+ coordinator .createPartition (sourcePartition );
221+ }
222+
223+
204224 private void updateLeaderProgressState (LeaderPartition leaderPartition ,
205225 String updatedToken ,
206226 EnhancedSourceCoordinator coordinator ) {
0 commit comments