Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,10 @@ void testNegativeDurationRange() throws Exception {

assertEquals(0, config.getLookBackHours());
}

@Test
void testDefaultDurationValues() {
assertEquals(Duration.ofDays(30), config.getDurationToGiveUpRetry());
assertEquals(Duration.ofDays(1), config.getDurationToDelayRetry());
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
package org.opensearch.dataprepper.plugins.source.source_crawler.base;

import java.time.Duration;

/**
* Marker interface to all the SAAS connectors configuration
*/
public interface CrawlerSourceConfig {

int DEFAULT_NUMBER_OF_WORKERS = 1;

/*
* Retry settings for non-retrayble exceptions in workerPartition
* default to 30 days to giveup retry; and 1 day to delay retry
*/
Duration DEFAULT_MAX_DURATION_TO_GIVEUP_RETRY = Duration.ofDays(30);
Duration DEFAULT_MAX_DURATION_TO_DELAY_RETRY = Duration.ofDays(1);

/**
* Number of worker threads enabled for this source
*
Expand All @@ -20,4 +29,20 @@ public interface CrawlerSourceConfig {
* @return boolean indicating acknowledgement state
*/
boolean isAcknowledgments();

/**
* Duration to give up retrying workerPartition's work on non-retrayble exceptions
* @return Duration indicating max duration to give up retrying
*/
default Duration getDurationToGiveUpRetry() {
return DEFAULT_MAX_DURATION_TO_GIVEUP_RETRY;
}

/**
* Duration to retry workerPartition's work on non-retrayble exceptions
* @return Duration indicating max duration to delay retrying
*/
default Duration getDurationToDelayRetry() {
return DEFAULT_MAX_DURATION_TO_DELAY_RETRY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,19 @@
import org.opensearch.dataprepper.plugins.source.source_crawler.base.Crawler;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerSourceConfig;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.SaasWorkerProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.DimensionalTimeSliceWorkerProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.SaasSourcePartition;
import org.opensearch.dataprepper.plugins.source.source_crawler.exception.SaaSCrawlerException;

import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Optional;
import java.time.Instant;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;

/**
* Worker class for executing the partitioned work created while crawling a source.
Expand Down Expand Up @@ -75,10 +80,10 @@ public void run() {
log.info("Worker thread started");
log.info("Processing Partitions");
while (!Thread.currentThread().isInterrupted()) {
Optional<EnhancedSourcePartition> partition = Optional.empty();
try {
// Get the next available partition from the coordinator
Optional<EnhancedSourcePartition> partition =
sourceCoordinator.acquireAvailablePartition(SaasSourcePartition.PARTITION_TYPE);
partition = sourceCoordinator.acquireAvailablePartition(SaasSourcePartition.PARTITION_TYPE);
if (partition.isPresent()) {
// Process the partition (source extraction logic)
processPartition(partition.get(), buffer);
Expand All @@ -94,27 +99,86 @@ public void run() {
}
}
} catch (Exception e) {
// TODO: will be in a followup to handle retry strategy differently for non-retryable exceptions
backoffRetry(e);
this.parititionsFailedCounter.increment();
// always default to backoffRetry strategy
boolean shouldLocalRetry = true;
if (e instanceof SaaSCrawlerException) {
SaaSCrawlerException saasException = (SaaSCrawlerException) e;
if (!saasException.isRetryable()) {
shouldLocalRetry = delayWorkerPartitionRetry(partition, e);
}
}
if (shouldLocalRetry) {
backoffRetry(e);
}
}
}
log.warn("SourceItemWorker Scheduler is interrupted, looks like shutdown has triggered");
}

/**
* Default behaviour of backoff retry workerScheduler by sleeping RETRY_BACKOFF_ON_EXCEPTION_MILLIS
* Default behaviour of backoffRetry workerScheduler by sleeping RETRY_BACKOFF_ON_EXCEPTION_MILLIS
* @param e - exception thrown by workerScheduler
*/
private void backoffRetry(Exception e) {
this.parititionsFailedCounter.increment();
log.error("Error processing partition", e);
log.error("[Retryable Exception] Error processing partition", e);
try {
Thread.sleep(RETRY_BACKOFF_ON_EXCEPTION_MILLIS);
} catch (InterruptedException ex) {
log.warn("Thread interrupted while waiting to retry due to {}", ex.getMessage());
}
}

/**
* Delay retry on a workerPartition by X Duration (current default = 1 day) for all non-retryble exceptions up to X days (current default = 30 days)
* @param sourcePartition - information on WorkerPartition state
* @param ex - exception thrown by workerScheduler
* @return boolean: true if we should fallback to localRetry
*/
private boolean delayWorkerPartitionRetry(Optional<EnhancedSourcePartition> sourcePartition, Exception ex) {
log.error("[Non-Retryable Exception] Error processing worker partition. Will delay retry with the configured duration", ex);
try {
SaasSourcePartition workerPartition = (SaasSourcePartition) sourcePartition.get();
boolean shouldLocalRetry = true;
if (workerPartition != null) {
SaasWorkerProgressState progressState = (SaasWorkerProgressState) workerPartition.getProgressState().get();
// TODO: ideally we should add partitionCreationTime for all type of SaasWorkerProgressState
if (progressState instanceof DimensionalTimeSliceWorkerProgressState) {
DimensionalTimeSliceWorkerProgressState workerProgressState = (DimensionalTimeSliceWorkerProgressState) progressState;
updateWorkerPartition(workerProgressState.getPartitionCreationTime(), workerPartition);
shouldLocalRetry = false;
Comment on lines +146 to +149

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intention of the logic is to update the worker partition when partitionCreationTime exists in the state? or do this only for DimensionalTimeSliceWorkerProgressState ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, only DimensionalTimeSliceWorkerProgressState has partitionCreationTime field but ideally we will followup with adding partitionCreationTime field to other SaasWorkerProgressState types.

}
}

// other SaasWorkerProgressState types (not DimensionalTimeSliceWorkerProgressState) should never use delayWorkerPartitionRetry()
// to be safe, fallback to default retry strategy
return shouldLocalRetry;
} catch (Exception e) {
log.error("Error updating workerPartition ", e);
// on exception, do not interrupt thread and retry again
return false;
}
}


/**
* Update the workerPartition if the partitionCreationTime <= max days to keep retrying (current default = 30 days) on nonretryable exceptions.
* Otherwise, give up the workerPartition.
* @param partitionCreationTime - timestamp in epoch when the worker partition was first created
* @param workerPartition - information on WorkerPartition state
*/
private void updateWorkerPartition(Instant partitionCreationTime, SaasSourcePartition workerPartition) {
log.info("Updating workerPartition {}", workerPartition.getPartitionKey());
Duration age = Duration.between(partitionCreationTime, Instant.now());
if (age.compareTo(this.sourceConfig.getDurationToGiveUpRetry()) <= 0) {
log.info(NOISY, "Partition {} is within or equal to the configured max duration, scheduling retry", workerPartition.getPartitionKey());
sourceCoordinator.saveProgressStateForPartition(workerPartition, this.sourceConfig.getDurationToDelayRetry());
} else {
log.info("Partition {} is older than the configured max duration, giving up", workerPartition.getPartitionKey());
sourceCoordinator.giveUpPartition(workerPartition);
}
}

private void processPartition(EnhancedSourcePartition partition, Buffer<Record<Event>> buffer) {
// Implement your source extraction logic here
// Update the partition state or commit the partition as needed
Expand Down
Loading
Loading