From 7b3b5cc2d991d32a160d95565d39e99d2235b60e Mon Sep 17 00:00:00 2001 From: Santhosh Gandhe <1909520+san81@users.noreply.github.com> Date: Wed, 29 Oct 2025 16:35:44 -0700 Subject: [PATCH 1/3] Fixing the crawler framework to handle ddb outage scenario Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> --- .../coordination/scheduler/LeaderScheduler.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderScheduler.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderScheduler.java index e6a7c5ee92..074f83cd9c 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderScheduler.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderScheduler.java @@ -65,6 +65,11 @@ public void run() { try { coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_MINUTES); } catch (final Exception e) { + // For any reason, if the leader schedule unable to reach source coordination ddb table, + // it should give up leader partition and require whenever ddb store reachable again + // Not giving up will make this to continue hold on to an ownership expired record which will create inconsistent state issues + // if you are not the owner in ddb table, then you are not supposed to hold the ownership + leaderPartition = null; LOG.error("Failed to save Leader partition state. This process will retry."); } } From 308e3ef04667ccb594fdc2ac23d8c2c70f9ce11f Mon Sep 17 00:00:00 2001 From: Santhosh Gandhe <1909520+san81@users.noreply.github.com> Date: Thu, 30 Oct 2025 10:18:09 -0700 Subject: [PATCH 2/3] Adding corresponding unit test to validate reacquiring of leader partition state after ddb calls succeed Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> --- .../scheduler/LeaderSchedulerTest.java | 46 ++++++++++++++++++- 1 file changed, 44 insertions(+), 2 deletions(-) diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java index d37918d418..32fdefec60 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java @@ -169,13 +169,55 @@ void testWhileLoopRunnningAfterTheSleep() throws InterruptedException { void testSetLeaderProgressState_throwsExceptionOnStateMismatch() { // Create a LeaderPartition with TokenPaginationCrawlerLeaderProgressState LeaderPartition leaderPartition = new LeaderPartition(new TokenPaginationCrawlerLeaderProgressState("")); - + // Try to set a different type of state (PaginationCrawlerLeaderProgressState) PaginationCrawlerLeaderProgressState incompatibleState = new PaginationCrawlerLeaderProgressState(Instant.now()); - + // Verify that RuntimeException is thrown due to state type mismatch RuntimeException exception = assertThrows(RuntimeException.class, () -> { leaderPartition.setLeaderProgressState(incompatibleState); }); } + + @Test + void testLeaderPartitionGivenUpOnSaveFailure_andRetryAcquire() throws InterruptedException { + // This test verifies the fix for line 72: leaderPartition = null when saveProgressStateForPartition fails + // This ensures that if DynamoDB becomes unreachable, the leader gives up the partition and retries acquisition + + LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, crawler); + leaderScheduler.setLeaseInterval(Duration.ofMillis(10)); + + LeaderPartition leaderPartition = new LeaderPartition(new TokenPaginationCrawlerLeaderProgressState("")); + TokenPaginationCrawlerLeaderProgressState state = (TokenPaginationCrawlerLeaderProgressState) leaderPartition.getProgressState().get(); + state.setInitialized(true); + state.setLastToken(LAST_TOKEN); + + // First acquisition succeeds, but subsequent saveProgressStateForPartition fails (simulating DynamoDB outage) + // Then second acquisition should be attempted after giving up the partition + when(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)) + .thenReturn(Optional.of(leaderPartition)) // First acquisition succeeds + .thenReturn(Optional.of(leaderPartition)); // Second acquisition after giving up partition + + // saveProgressStateForPartition throws exception to simulate DynamoDB outage + doThrow(new RuntimeException("DynamoDB unreachable")) + .doNothing() // Second time succeeds after recovery + .when(coordinator).saveProgressStateForPartition(any(LeaderPartition.class), any(Duration.class)); + + ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(leaderScheduler); + + // Wait long enough for multiple iterations + Thread.sleep(100); + executorService.shutdownNow(); + + // Verify that crawler was called multiple times (showing the scheduler continued to work) + verify(crawler, atLeast(2)).crawl(any(LeaderPartition.class), any(EnhancedSourceCoordinator.class)); + + // Verify that acquireAvailablePartition was called at least twice: + // once for initial acquisition, and again after giving up due to save failure + verify(coordinator, atLeast(2)).acquireAvailablePartition(LeaderPartition.PARTITION_TYPE); + + // Verify that saveProgressStateForPartition was attempted multiple times + verify(coordinator, atLeast(2)).saveProgressStateForPartition(any(LeaderPartition.class), any(Duration.class)); + } } From 39b6e0285aad3a0ab2dafbfc9dd1e17d9632078e Mon Sep 17 00:00:00 2001 From: Santhosh Gandhe <1909520+san81@users.noreply.github.com> Date: Thu, 30 Oct 2025 12:05:31 -0700 Subject: [PATCH 3/3] Better display name Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> --- .../coordination/scheduler/LeaderSchedulerTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java index 32fdefec60..fb8d512707 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java @@ -1,5 +1,6 @@ package org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; @@ -18,6 +19,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; @@ -27,7 +29,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; -import static org.junit.jupiter.api.Assertions.assertThrows; @ExtendWith(MockitoExtension.class) public class LeaderSchedulerTest { @@ -180,6 +181,7 @@ void testSetLeaderProgressState_throwsExceptionOnStateMismatch() { } @Test + @DisplayName("Ensure that if DynamoDB becomes unreachable, the leader gives up the partition and retries acquisition") void testLeaderPartitionGivenUpOnSaveFailure_andRetryAcquire() throws InterruptedException { // This test verifies the fix for line 72: leaderPartition = null when saveProgressStateForPartition fails // This ensures that if DynamoDB becomes unreachable, the leader gives up the partition and retries acquisition