diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderScheduler.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderScheduler.java index e6a7c5ee92..074f83cd9c 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderScheduler.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderScheduler.java @@ -65,6 +65,11 @@ public void run() { try { coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_MINUTES); } catch (final Exception e) { + // For any reason, if the leader schedule unable to reach source coordination ddb table, + // it should give up leader partition and require whenever ddb store reachable again + // Not giving up will make this to continue hold on to an ownership expired record which will create inconsistent state issues + // if you are not the owner in ddb table, then you are not supposed to hold the ownership + leaderPartition = null; LOG.error("Failed to save Leader partition state. This process will retry."); } } diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java index d37918d418..fb8d512707 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java @@ -1,5 +1,6 @@ package org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; @@ -18,6 +19,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; @@ -27,7 +29,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; -import static org.junit.jupiter.api.Assertions.assertThrows; @ExtendWith(MockitoExtension.class) public class LeaderSchedulerTest { @@ -169,13 +170,56 @@ void testWhileLoopRunnningAfterTheSleep() throws InterruptedException { void testSetLeaderProgressState_throwsExceptionOnStateMismatch() { // Create a LeaderPartition with TokenPaginationCrawlerLeaderProgressState LeaderPartition leaderPartition = new LeaderPartition(new TokenPaginationCrawlerLeaderProgressState("")); - + // Try to set a different type of state (PaginationCrawlerLeaderProgressState) PaginationCrawlerLeaderProgressState incompatibleState = new PaginationCrawlerLeaderProgressState(Instant.now()); - + // Verify that RuntimeException is thrown due to state type mismatch RuntimeException exception = assertThrows(RuntimeException.class, () -> { leaderPartition.setLeaderProgressState(incompatibleState); }); } + + @Test + @DisplayName("Ensure that if DynamoDB becomes unreachable, the leader gives up the partition and retries acquisition") + void testLeaderPartitionGivenUpOnSaveFailure_andRetryAcquire() throws InterruptedException { + // This test verifies the fix for line 72: leaderPartition = null when saveProgressStateForPartition fails + // This ensures that if DynamoDB becomes unreachable, the leader gives up the partition and retries acquisition + + LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, crawler); + leaderScheduler.setLeaseInterval(Duration.ofMillis(10)); + + LeaderPartition leaderPartition = new LeaderPartition(new TokenPaginationCrawlerLeaderProgressState("")); + TokenPaginationCrawlerLeaderProgressState state = (TokenPaginationCrawlerLeaderProgressState) leaderPartition.getProgressState().get(); + state.setInitialized(true); + state.setLastToken(LAST_TOKEN); + + // First acquisition succeeds, but subsequent saveProgressStateForPartition fails (simulating DynamoDB outage) + // Then second acquisition should be attempted after giving up the partition + when(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)) + .thenReturn(Optional.of(leaderPartition)) // First acquisition succeeds + .thenReturn(Optional.of(leaderPartition)); // Second acquisition after giving up partition + + // saveProgressStateForPartition throws exception to simulate DynamoDB outage + doThrow(new RuntimeException("DynamoDB unreachable")) + .doNothing() // Second time succeeds after recovery + .when(coordinator).saveProgressStateForPartition(any(LeaderPartition.class), any(Duration.class)); + + ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(leaderScheduler); + + // Wait long enough for multiple iterations + Thread.sleep(100); + executorService.shutdownNow(); + + // Verify that crawler was called multiple times (showing the scheduler continued to work) + verify(crawler, atLeast(2)).crawl(any(LeaderPartition.class), any(EnhancedSourceCoordinator.class)); + + // Verify that acquireAvailablePartition was called at least twice: + // once for initial acquisition, and again after giving up due to save failure + verify(coordinator, atLeast(2)).acquireAvailablePartition(LeaderPartition.PARTITION_TYPE); + + // Verify that saveProgressStateForPartition was attempted multiple times + verify(coordinator, atLeast(2)).saveProgressStateForPartition(any(LeaderPartition.class), any(Duration.class)); + } }