1212import org .opensearch .dataprepper .plugins .source .source_crawler .base .Crawler ;
1313import org .opensearch .dataprepper .plugins .source .source_crawler .base .CrawlerSourceConfig ;
1414import org .opensearch .dataprepper .plugins .source .source_crawler .base .SaasWorkerProgressState ;
15+ import org .opensearch .dataprepper .plugins .source .source_crawler .coordination .state .DimensionalTimeSliceWorkerProgressState ;
1516import org .opensearch .dataprepper .plugins .source .source_crawler .coordination .partition .SaasSourcePartition ;
17+ import org .opensearch .dataprepper .plugins .source .source_crawler .exception .SaaSCrawlerException ;
1618
1719import com .google .common .annotations .VisibleForTesting ;
1820import org .slf4j .Logger ;
1921import org .slf4j .LoggerFactory ;
2022
2123import java .time .Duration ;
2224import java .util .Optional ;
25+ import java .time .Instant ;
26+
27+ import static org .opensearch .dataprepper .logging .DataPrepperMarkers .NOISY ;
2328
2429/**
2530 * Worker class for executing the partitioned work created while crawling a source.
@@ -75,10 +80,10 @@ public void run() {
7580 log .info ("Worker thread started" );
7681 log .info ("Processing Partitions" );
7782 while (!Thread .currentThread ().isInterrupted ()) {
83+ Optional <EnhancedSourcePartition > partition = Optional .empty ();
7884 try {
7985 // Get the next available partition from the coordinator
80- Optional <EnhancedSourcePartition > partition =
81- sourceCoordinator .acquireAvailablePartition (SaasSourcePartition .PARTITION_TYPE );
86+ partition = sourceCoordinator .acquireAvailablePartition (SaasSourcePartition .PARTITION_TYPE );
8287 if (partition .isPresent ()) {
8388 // Process the partition (source extraction logic)
8489 processPartition (partition .get (), buffer );
@@ -94,27 +99,86 @@ public void run() {
9499 }
95100 }
96101 } catch (Exception e ) {
97- // TODO: will be in a followup to handle retry strategy differently for non-retryable exceptions
98- backoffRetry (e );
102+ this .parititionsFailedCounter .increment ();
103+ // always default to backoffRetry strategy
104+ boolean shouldLocalRetry = true ;
105+ if (e instanceof SaaSCrawlerException ) {
106+ SaaSCrawlerException saasException = (SaaSCrawlerException ) e ;
107+ if (!saasException .isRetryable ()) {
108+ shouldLocalRetry = delayWorkerPartitionRetry (partition , e );
109+ }
110+ }
111+ if (shouldLocalRetry ) {
112+ backoffRetry (e );
113+ }
99114 }
100115 }
101116 log .warn ("SourceItemWorker Scheduler is interrupted, looks like shutdown has triggered" );
102117 }
103118
104119 /**
105- * Default behaviour of backoff retry workerScheduler by sleeping RETRY_BACKOFF_ON_EXCEPTION_MILLIS
120+ * Default behaviour of backoffRetry workerScheduler by sleeping RETRY_BACKOFF_ON_EXCEPTION_MILLIS
106121 * @param e - exception thrown by workerScheduler
107122 */
108123 private void backoffRetry (Exception e ) {
109- this .parititionsFailedCounter .increment ();
110- log .error ("Error processing partition" , e );
124+ log .error ("[Retryable Exception] Error processing partition" , e );
111125 try {
112126 Thread .sleep (RETRY_BACKOFF_ON_EXCEPTION_MILLIS );
113127 } catch (InterruptedException ex ) {
114128 log .warn ("Thread interrupted while waiting to retry due to {}" , ex .getMessage ());
115129 }
116130 }
117131
132+ /**
133+ * Delay retry on a workerPartition by X Duration (current default = 1 day) for all non-retryble exceptions up to X days (current default = 30 days)
134+ * @param sourcePartition - information on WorkerPartition state
135+ * @param ex - exception thrown by workerScheduler
136+ * @return boolean: true if we should fallback to localRetry
137+ */
138+ private boolean delayWorkerPartitionRetry (Optional <EnhancedSourcePartition > sourcePartition , Exception ex ) {
139+ log .error ("[Non-Retryable Exception] Error processing worker partition. Will delay retry with the configured duration" , ex );
140+ try {
141+ SaasSourcePartition workerPartition = (SaasSourcePartition ) sourcePartition .get ();
142+ boolean shouldLocalRetry = true ;
143+ if (workerPartition != null ) {
144+ SaasWorkerProgressState progressState = (SaasWorkerProgressState ) workerPartition .getProgressState ().get ();
145+ // TODO: ideally we should add partitionCreationTime for all type of SaasWorkerProgressState
146+ if (progressState instanceof DimensionalTimeSliceWorkerProgressState ) {
147+ DimensionalTimeSliceWorkerProgressState workerProgressState = (DimensionalTimeSliceWorkerProgressState ) progressState ;
148+ updateWorkerPartition (workerProgressState .getPartitionCreationTime (), workerPartition );
149+ shouldLocalRetry = false ;
150+ }
151+ }
152+
153+ // other SaasWorkerProgressState types (not DimensionalTimeSliceWorkerProgressState) should never use delayWorkerPartitionRetry()
154+ // to be safe, fallback to default retry strategy
155+ return shouldLocalRetry ;
156+ } catch (Exception e ) {
157+ log .error ("Error updating workerPartition " , e );
158+ // on exception, do not interrupt thread and retry again
159+ return false ;
160+ }
161+ }
162+
163+
164+ /**
165+ * Update the workerPartition if the partitionCreationTime <= max days to keep retrying (current default = 30 days) on nonretryable exceptions.
166+ * Otherwise, give up the workerPartition.
167+ * @param partitionCreationTime - timestamp in epoch when the worker partition was first created
168+ * @param workerPartition - information on WorkerPartition state
169+ */
170+ private void updateWorkerPartition (Instant partitionCreationTime , SaasSourcePartition workerPartition ) {
171+ log .info ("Updating workerPartition {}" , workerPartition .getPartitionKey ());
172+ Duration age = Duration .between (partitionCreationTime , Instant .now ());
173+ if (age .compareTo (this .sourceConfig .getDurationToGiveUpRetry ()) <= 0 ) {
174+ log .info (NOISY , "Partition {} is within or equal to the configured max duration, scheduling retry" , workerPartition .getPartitionKey ());
175+ sourceCoordinator .saveProgressStateForPartition (workerPartition , this .sourceConfig .getDurationToDelayRetry ());
176+ } else {
177+ log .info ("Partition {} is older than the configured max duration, giving up" , workerPartition .getPartitionKey ());
178+ sourceCoordinator .giveUpPartition (workerPartition );
179+ }
180+ }
181+
118182 private void processPartition (EnhancedSourcePartition partition , Buffer <Record <Event >> buffer ) {
119183 // Implement your source extraction logic here
120184 // Update the partition state or commit the partition as needed
0 commit comments