Skip to content

Commit 3fa0361

Browse files
vechekasimonelbaz
authored andcommitted
Implement handling strategy for retryable vs non-retryable exceptons in workerPartition (opensearch-project#6270)
Signed-off-by: Vecheka Chhourn <vecheka@amazon.com>
1 parent 3417a57 commit 3fa0361

4 files changed

Lines changed: 291 additions & 67 deletions

File tree

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365SourceConfigTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,4 +101,10 @@ void testNegativeDurationRange() throws Exception {
101101

102102
assertEquals(0, config.getLookBackHours());
103103
}
104+
105+
@Test
106+
void testDefaultDurationValues() {
107+
assertEquals(Duration.ofDays(30), config.getDurationToGiveUpRetry());
108+
assertEquals(Duration.ofDays(1), config.getDurationToDelayRetry());
109+
}
104110
}

data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourceConfig.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,21 @@
11
package org.opensearch.dataprepper.plugins.source.source_crawler.base;
22

3+
import java.time.Duration;
4+
35
/**
46
* Marker interface to all the SAAS connectors configuration
57
*/
68
public interface CrawlerSourceConfig {
79

810
int DEFAULT_NUMBER_OF_WORKERS = 1;
911

12+
/*
13+
* Retry settings for non-retrayble exceptions in workerPartition
14+
* default to 30 days to giveup retry; and 1 day to delay retry
15+
*/
16+
Duration DEFAULT_MAX_DURATION_TO_GIVEUP_RETRY = Duration.ofDays(30);
17+
Duration DEFAULT_MAX_DURATION_TO_DELAY_RETRY = Duration.ofDays(1);
18+
1019
/**
1120
* Number of worker threads enabled for this source
1221
*
@@ -20,4 +29,20 @@ public interface CrawlerSourceConfig {
2029
* @return boolean indicating acknowledgement state
2130
*/
2231
boolean isAcknowledgments();
32+
33+
/**
34+
* Duration to give up retrying workerPartition's work on non-retrayble exceptions
35+
* @return Duration indicating max duration to give up retrying
36+
*/
37+
default Duration getDurationToGiveUpRetry() {
38+
return DEFAULT_MAX_DURATION_TO_GIVEUP_RETRY;
39+
}
40+
41+
/**
42+
* Duration to retry workerPartition's work on non-retrayble exceptions
43+
* @return Duration indicating max duration to delay retrying
44+
*/
45+
default Duration getDurationToDelayRetry() {
46+
return DEFAULT_MAX_DURATION_TO_DELAY_RETRY;
47+
}
2348
}

data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/WorkerScheduler.java

Lines changed: 71 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,19 @@
1212
import org.opensearch.dataprepper.plugins.source.source_crawler.base.Crawler;
1313
import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerSourceConfig;
1414
import org.opensearch.dataprepper.plugins.source.source_crawler.base.SaasWorkerProgressState;
15+
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.DimensionalTimeSliceWorkerProgressState;
1516
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.SaasSourcePartition;
17+
import org.opensearch.dataprepper.plugins.source.source_crawler.exception.SaaSCrawlerException;
1618

1719
import com.google.common.annotations.VisibleForTesting;
1820
import org.slf4j.Logger;
1921
import org.slf4j.LoggerFactory;
2022

2123
import java.time.Duration;
2224
import java.util.Optional;
25+
import java.time.Instant;
26+
27+
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
2328

2429
/**
2530
* Worker class for executing the partitioned work created while crawling a source.
@@ -75,10 +80,10 @@ public void run() {
7580
log.info("Worker thread started");
7681
log.info("Processing Partitions");
7782
while (!Thread.currentThread().isInterrupted()) {
83+
Optional<EnhancedSourcePartition> partition = Optional.empty();
7884
try {
7985
// Get the next available partition from the coordinator
80-
Optional<EnhancedSourcePartition> partition =
81-
sourceCoordinator.acquireAvailablePartition(SaasSourcePartition.PARTITION_TYPE);
86+
partition = sourceCoordinator.acquireAvailablePartition(SaasSourcePartition.PARTITION_TYPE);
8287
if (partition.isPresent()) {
8388
// Process the partition (source extraction logic)
8489
processPartition(partition.get(), buffer);
@@ -94,27 +99,86 @@ public void run() {
9499
}
95100
}
96101
} catch (Exception e) {
97-
// TODO: will be in a followup to handle retry strategy differently for non-retryable exceptions
98-
backoffRetry(e);
102+
this.parititionsFailedCounter.increment();
103+
// always default to backoffRetry strategy
104+
boolean shouldLocalRetry = true;
105+
if (e instanceof SaaSCrawlerException) {
106+
SaaSCrawlerException saasException = (SaaSCrawlerException) e;
107+
if (!saasException.isRetryable()) {
108+
shouldLocalRetry = delayWorkerPartitionRetry(partition, e);
109+
}
110+
}
111+
if (shouldLocalRetry) {
112+
backoffRetry(e);
113+
}
99114
}
100115
}
101116
log.warn("SourceItemWorker Scheduler is interrupted, looks like shutdown has triggered");
102117
}
103118

104119
/**
105-
* Default behaviour of backoff retry workerScheduler by sleeping RETRY_BACKOFF_ON_EXCEPTION_MILLIS
120+
* Default behaviour of backoffRetry workerScheduler by sleeping RETRY_BACKOFF_ON_EXCEPTION_MILLIS
106121
* @param e - exception thrown by workerScheduler
107122
*/
108123
private void backoffRetry(Exception e) {
109-
this.parititionsFailedCounter.increment();
110-
log.error("Error processing partition", e);
124+
log.error("[Retryable Exception] Error processing partition", e);
111125
try {
112126
Thread.sleep(RETRY_BACKOFF_ON_EXCEPTION_MILLIS);
113127
} catch (InterruptedException ex) {
114128
log.warn("Thread interrupted while waiting to retry due to {}", ex.getMessage());
115129
}
116130
}
117131

132+
/**
133+
* Delay retry on a workerPartition by X Duration (current default = 1 day) for all non-retryble exceptions up to X days (current default = 30 days)
134+
* @param sourcePartition - information on WorkerPartition state
135+
* @param ex - exception thrown by workerScheduler
136+
* @return boolean: true if we should fallback to localRetry
137+
*/
138+
private boolean delayWorkerPartitionRetry(Optional<EnhancedSourcePartition> sourcePartition, Exception ex) {
139+
log.error("[Non-Retryable Exception] Error processing worker partition. Will delay retry with the configured duration", ex);
140+
try {
141+
SaasSourcePartition workerPartition = (SaasSourcePartition) sourcePartition.get();
142+
boolean shouldLocalRetry = true;
143+
if (workerPartition != null) {
144+
SaasWorkerProgressState progressState = (SaasWorkerProgressState) workerPartition.getProgressState().get();
145+
// TODO: ideally we should add partitionCreationTime for all type of SaasWorkerProgressState
146+
if (progressState instanceof DimensionalTimeSliceWorkerProgressState) {
147+
DimensionalTimeSliceWorkerProgressState workerProgressState = (DimensionalTimeSliceWorkerProgressState) progressState;
148+
updateWorkerPartition(workerProgressState.getPartitionCreationTime(), workerPartition);
149+
shouldLocalRetry = false;
150+
}
151+
}
152+
153+
// other SaasWorkerProgressState types (not DimensionalTimeSliceWorkerProgressState) should never use delayWorkerPartitionRetry()
154+
// to be safe, fallback to default retry strategy
155+
return shouldLocalRetry;
156+
} catch (Exception e) {
157+
log.error("Error updating workerPartition ", e);
158+
// on exception, do not interrupt thread and retry again
159+
return false;
160+
}
161+
}
162+
163+
164+
/**
165+
* Update the workerPartition if the partitionCreationTime <= max days to keep retrying (current default = 30 days) on nonretryable exceptions.
166+
* Otherwise, give up the workerPartition.
167+
* @param partitionCreationTime - timestamp in epoch when the worker partition was first created
168+
* @param workerPartition - information on WorkerPartition state
169+
*/
170+
private void updateWorkerPartition(Instant partitionCreationTime, SaasSourcePartition workerPartition) {
171+
log.info("Updating workerPartition {}", workerPartition.getPartitionKey());
172+
Duration age = Duration.between(partitionCreationTime, Instant.now());
173+
if (age.compareTo(this.sourceConfig.getDurationToGiveUpRetry()) <= 0) {
174+
log.info(NOISY, "Partition {} is within or equal to the configured max duration, scheduling retry", workerPartition.getPartitionKey());
175+
sourceCoordinator.saveProgressStateForPartition(workerPartition, this.sourceConfig.getDurationToDelayRetry());
176+
} else {
177+
log.info("Partition {} is older than the configured max duration, giving up", workerPartition.getPartitionKey());
178+
sourceCoordinator.giveUpPartition(workerPartition);
179+
}
180+
}
181+
118182
private void processPartition(EnhancedSourcePartition partition, Buffer<Record<Event>> buffer) {
119183
// Implement your source extraction logic here
120184
// Update the partition state or commit the partition as needed

0 commit comments

Comments
 (0)