Skip to content

Implement handling strategy for retryable vs non-retryable exceptions in workerPartition#6270

Merged
san81 merged 5 commits into
opensearch-project:mainfrom
vecheka:feature/worker-partition-retry-strategy
Jan 15, 2026
Merged

Implement handling strategy for retryable vs non-retryable exceptions in workerPartition#6270
san81 merged 5 commits into
opensearch-project:mainfrom
vecheka:feature/worker-partition-retry-strategy

Conversation

@vecheka

@vecheka vecheka commented Nov 14, 2025

Copy link
Copy Markdown
Contributor

Description

This is second part of the change to handle retryable vs non-retryable excpetions.

First part PR for more context: #6255

How

We are adding a new generic exception class SaaSCrawlerException to be shared by all connectors. This class is similar to previous API specific exception class (e.g Office365Exception).

CrawlerException will have two criterias:

  • Any REST API calls failures will be considered retryable. Additionally, writing to buffer will be considered retryable too.
  • Other exceptions (e.g internal failures) will be considered non-retryable

We will utilize CrawlerException, and throw this up all the way to WorkerSchedule where in the followup PR:

  • If the exception is flagged as "retryable = true", we will continue with current behaviour of backoff retry every 5ms
  • Otherwise, we will delay the retry by 1 day by calling sourceCoordinator.saveProgressStateForPartition(workerPartition, DURATION_TO_DELAY_RETRY). If it continues to fail up to 30 days (using partitionCreationTime field to confirm), we will give up the worker partition.

Is this change backward compatible?

Yes. We ensure to keep the catch block on generic "Exception" so all other connectors will still use the default behaviour of backoff retry every 5ms for all exception types.

Testing

Unit tests, ran the below successfully:

./gradlew :data-prepper-plugins:saas-source-plugins:microsoft-office365-source:test \
--tests "org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365SourceConfigTest"


./gradlew :data-prepper-plugins:saas-source-plugins:source-crawler:test \
--tests "org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler.WorkerSchedulerTest"

./gradlew :data-prepper-plugins:saas-source-plugins:microsoft-office365-source:checkstyleTest  
./gradlew :data-prepper-plugins:saas-source-plugins:source-crawler:checkstyleTest  

Local testing:

dev-dsk-vecheka-2b-d8fbd7a6 % bin/data-prepper 
Reading pipelines and data-prepper configuration files from Data Prepper home directory.
/usr/lib/jvm/java-11-amazon-corretto.x86_64/bin/java
Found openjdk version  of 11.0
2025-11-14T01:26:09,540 [main] INFO  org.opensearch.dataprepper.pipeline.parser.transformer.DynamicConfigTransformer - No transformation needed
2025-11-14T01:26:12,046 [main] INFO  org.opensearch.dataprepper.plugins.kafka.extension.KafkaClusterConfigExtension - Applying Kafka Cluster Config Extension.
2025-11-14T01:26:12,751 [main] INFO  org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365Source - Creating Office365 Source Plugin
2025-11-14T01:26:12,806 [main] WARN  org.opensearch.dataprepper.core.pipeline.server.config.DataPrepperServerConfiguration - Creating data prepper server without authentication. This is not secure.
2025-11-14T01:26:12,806 [main] WARN  org.opensearch.dataprepper.core.pipeline.server.config.DataPrepperServerConfiguration - In order to set up Http Basic authentication for the data prepper server, go here: https://github.com/opensearch-project/data-prepper/blob/main/docs/core_apis.md#authentication
2025-11-14T01:26:13,908 [test-365-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365Source - Starting Office365 Source Plugin...
2025-11-14T01:26:13,909 [test-365-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.source.source_crawler.auth.AuthenticationInterface - Initializing credentials.
2025-11-14T01:26:13,909 [test-365-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.source.microsoft_office365.auth.Office365AuthenticationProvider - Getting new access token for Office 365 Management API
2025-11-14T01:26:13,915 [main] WARN  org.opensearch.dataprepper.core.pipeline.server.HttpServerProvider - Creating Data Prepper server without TLS. This is not secure.
2025-11-14T01:26:13,915 [main] WARN  org.opensearch.dataprepper.core.pipeline.server.HttpServerProvider - In order to set up TLS for the Data Prepper server, go here: https://github.com/opensearch-project/data-prepper/blob/main/docs/configuration.md#server-configuration
2025-11-14T01:26:13,924 [test-365-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.aws.AwsSecretsSupplier - Retrieving latest secrets in aws:secrets:office365-credentials.
2025-11-14T01:26:14,037 [test-365-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.aws.AwsSecretsSupplier - Finished retrieving latest secret in aws:secrets:office365-credentials.
2025-11-14T01:26:14,037 [test-365-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.aws.AwsSecretsSupplier - Retrieving latest secrets in aws:secrets:office365-credentials.
2025-11-14T01:26:14,133 [test-365-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.aws.AwsSecretsSupplier - Finished retrieving latest secret in aws:secrets:office365-credentials.
2025-11-14T01:26:14,581 [test-365-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.source.microsoft_office365.auth.Office365AuthenticationProvider - Received new access token. Expires in 3599 seconds
2025-11-14T01:26:14,581 [test-365-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.source.source_crawler.auth.AuthenticationInterface - Credentials initialized successfully
2025-11-14T01:26:14,581 [test-365-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365RestClient - Starting Office 365 subscriptions for audit logs
2025-11-14T01:26:16,079 [test-365-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerSourcePlugin - Starting microsoft_office365 Source Plugin
2025-11-14T01:26:16,198 [pool-6-thread-2] INFO  org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler.WorkerScheduler - Worker thread started
2025-11-14T01:26:16,198 [pool-6-thread-3] INFO  org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler.WorkerScheduler - Worker thread started
2025-11-14T01:26:16,198 [pool-6-thread-2] INFO  org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler.WorkerScheduler - Processing Partitions
2025-11-14T01:26:16,198 [pool-6-thread-4] INFO  org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler.WorkerScheduler - Worker thread started
2025-11-14T01:26:16,198 [pool-6-thread-3] INFO  org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler.WorkerScheduler - Processing Partitions
2025-11-14T01:26:16,199 [pool-6-thread-5] INFO  org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler.WorkerScheduler - Worker thread started
2025-11-14T01:26:16,199 [pool-6-thread-4] INFO  org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler.WorkerScheduler - Processing Partitions
2025-11-14T01:26:16,199 [pool-6-thread-5] INFO  org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler.WorkerScheduler - Processing Partitions
2025-11-14T01:26:16,625 [pool-6-thread-1] INFO  org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler.LeaderScheduler - Running as a LEADER node
2025-11-14T01:26:16,625 [pool-6-thread-1] INFO  org.opensearch.dataprepper.plugins.source.source_crawler.base.DimensionalTimeSliceCrawler - Total partitions created in this crawl: 0.0
2025-11-14T01:26:42,045 [pool-3-thread-1] INFO  org.opensearch.dataprepper.plugins.aws.AwsSecretsSupplier - Retrieving latest secrets in aws:secrets:office365-credentials.
2025-11-14T01:26:42,146 [pool-3-thread-1] INFO  org.opensearch.dataprepper.plugins.aws.AwsSecretsSupplier - Finished retrieving latest secret in aws:secrets:office365-credentials.
2025-11-14T01:27:12,045 [pool-3-thread-1] INFO  org.opensearch.dataprepper.plugins.aws.AwsSecretsSupplier - Retrieving latest secrets in aws:secrets:office365-credentials.
2025-11-14T01:27:12,149 [pool-3-thread-1] INFO  org.opensearch.dataprepper.plugins.aws.AwsSecretsSupplier - Finished retrieving latest secret in aws:secrets:office365-credentials.
2025-11-14T01:27:16,715 [pool-6-thread-1] INFO  org.opensearch.dataprepper.plugins.source.source_crawler.base.DimensionalTimeSliceCrawler - Total partitions created in this crawl: 0.0

Issues Resolved

N/A

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@vecheka vecheka force-pushed the feature/worker-partition-retry-strategy branch 2 times, most recently from e91a9aa to fc43488 Compare November 14, 2025 02:57
…in workerPartition

Signed-off-by: Vecheka Chhourn <vecheka@amazon.com>
@vecheka vecheka force-pushed the feature/worker-partition-retry-strategy branch from fc43488 to 0930903 Compare November 14, 2025 02:58
* @param ex - exception thrown by workerScheduler
* @return boolean: true if we should fallback to backoffRetry
*/
private boolean delayRetry(Optional<EnhancedSourcePartition> sourcePartition, Exception ex) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit.

Suggest to change the naming of this function and isWorkerPartitionLeaseExtended if we have a new revision.

delayRetry and backoffRetry can be renamed to something like shouldLocalRetry. It is because extending partition lease is also a sort of retry in partition level.

isWorkerPartitionLeaseExtended is not 100% true because there is scenario to give up the partition and lease not being extended. It should also be shouldLocalRetry

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. I've renamed them for better readability. Thanks!

…retry-strategy

Signed-off-by: Vecheka Chhourn <vecheka@amazon.com>
@vecheka vecheka force-pushed the feature/worker-partition-retry-strategy branch from 60e5f63 to ee15c10 Compare December 18, 2025 03:36
log.info("Updating workerPartition {}", workerPartition.getPartitionKey());
Duration age = Duration.between(partitionCreationTime, Instant.now());
if (age.compareTo(this.sourceConfig.getDurationToGiveUpRetry()) <= 0) {
log.info("Partition {} is within or equal to the configured max duration, scheduling retry", workerPartition.getPartitionKey());

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log could be noisy.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, will update this.

Comment on lines +144 to +147
if (progressState instanceof DimensionalTimeSliceWorkerProgressState) {
DimensionalTimeSliceWorkerProgressState workerProgressState = (DimensionalTimeSliceWorkerProgressState) progressState;
updateWorkerPartition(workerProgressState.getPartitionCreationTime(), workerPartition);
shouldLocalRetry = false;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intention of the logic is to update the worker partition when partitionCreationTime exists in the state? or do this only for DimensionalTimeSliceWorkerProgressState ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, only DimensionalTimeSliceWorkerProgressState has partitionCreationTime field but ideally we will followup with adding partitionCreationTime field to other SaasWorkerProgressState types.

…retry-strategy

Signed-off-by: Vecheka Chhourn <vecheka@amazon.com>
@vecheka vecheka force-pushed the feature/worker-partition-retry-strategy branch from 3f4074e to 274b74b Compare January 13, 2026 18:39
@san81 san81 merged commit a477b9f into opensearch-project:main Jan 15, 2026
49 of 52 checks passed
san81 pushed a commit to san81/data-prepper that referenced this pull request Jan 27, 2026
…in workerPartition (opensearch-project#6270)

Signed-off-by: Vecheka Chhourn <vecheka@amazon.com>
simonelbaz pushed a commit to simonelbaz/data-prepper that referenced this pull request Jan 31, 2026
…in workerPartition (opensearch-project#6270)

Signed-off-by: Vecheka Chhourn <vecheka@amazon.com>
Signed-off-by: Simon ELBAZ <elbazsimon9@gmail.com>
simonelbaz pushed a commit to simonelbaz/data-prepper that referenced this pull request Jan 31, 2026
…in workerPartition (opensearch-project#6270)

Signed-off-by: Vecheka Chhourn <vecheka@amazon.com>
simonelbaz pushed a commit to simonelbaz/data-prepper that referenced this pull request Jan 31, 2026
…in workerPartition (opensearch-project#6270)

Signed-off-by: Vecheka Chhourn <vecheka@amazon.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants