Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ public void run() {
try {
coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_MINUTES);
} catch (final Exception e) {
// For any reason, if the leader schedule unable to reach source coordination ddb table,
// it should give up leader partition and require whenever ddb store reachable again
// Not giving up will make this to continue hold on to an ownership expired record which will create inconsistent state issues
// if you are not the owner in ddb table, then you are not supposed to hold the ownership
leaderPartition = null;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please include a unit test for this scenario and the expected outcome.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a unit test to validate reacquiring scenario.

LOG.error("Failed to save Leader partition state. This process will retry.");
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler;

import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
Expand All @@ -18,6 +19,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
Expand All @@ -27,7 +29,6 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import static org.junit.jupiter.api.Assertions.assertThrows;

@ExtendWith(MockitoExtension.class)
public class LeaderSchedulerTest {
Expand Down Expand Up @@ -169,13 +170,56 @@ void testWhileLoopRunnningAfterTheSleep() throws InterruptedException {
void testSetLeaderProgressState_throwsExceptionOnStateMismatch() {
// Create a LeaderPartition with TokenPaginationCrawlerLeaderProgressState
LeaderPartition leaderPartition = new LeaderPartition(new TokenPaginationCrawlerLeaderProgressState(""));

// Try to set a different type of state (PaginationCrawlerLeaderProgressState)
PaginationCrawlerLeaderProgressState incompatibleState = new PaginationCrawlerLeaderProgressState(Instant.now());

// Verify that RuntimeException is thrown due to state type mismatch
RuntimeException exception = assertThrows(RuntimeException.class, () -> {
leaderPartition.setLeaderProgressState(incompatibleState);
});
}

@Test

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can move your comment to the test name to make it easier to read.

@Test("Ensure that if DynamoDB becomes unreachable, the leader gives up the partition and retries acquisition")

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added @DidsplayName annotation with the description

@DisplayName("Ensure that if DynamoDB becomes unreachable, the leader gives up the partition and retries acquisition")
void testLeaderPartitionGivenUpOnSaveFailure_andRetryAcquire() throws InterruptedException {
// This test verifies the fix for line 72: leaderPartition = null when saveProgressStateForPartition fails
// This ensures that if DynamoDB becomes unreachable, the leader gives up the partition and retries acquisition

LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, crawler);
leaderScheduler.setLeaseInterval(Duration.ofMillis(10));

LeaderPartition leaderPartition = new LeaderPartition(new TokenPaginationCrawlerLeaderProgressState(""));
TokenPaginationCrawlerLeaderProgressState state = (TokenPaginationCrawlerLeaderProgressState) leaderPartition.getProgressState().get();
state.setInitialized(true);
state.setLastToken(LAST_TOKEN);

// First acquisition succeeds, but subsequent saveProgressStateForPartition fails (simulating DynamoDB outage)
// Then second acquisition should be attempted after giving up the partition
when(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE))
.thenReturn(Optional.of(leaderPartition)) // First acquisition succeeds
.thenReturn(Optional.of(leaderPartition)); // Second acquisition after giving up partition

// saveProgressStateForPartition throws exception to simulate DynamoDB outage
doThrow(new RuntimeException("DynamoDB unreachable"))
.doNothing() // Second time succeeds after recovery
.when(coordinator).saveProgressStateForPartition(any(LeaderPartition.class), any(Duration.class));

ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(leaderScheduler);

// Wait long enough for multiple iterations
Thread.sleep(100);
executorService.shutdownNow();

// Verify that crawler was called multiple times (showing the scheduler continued to work)
verify(crawler, atLeast(2)).crawl(any(LeaderPartition.class), any(EnhancedSourceCoordinator.class));

// Verify that acquireAvailablePartition was called at least twice:
// once for initial acquisition, and again after giving up due to save failure
verify(coordinator, atLeast(2)).acquireAvailablePartition(LeaderPartition.PARTITION_TYPE);

// Verify that saveProgressStateForPartition was attempted multiple times
verify(coordinator, atLeast(2)).saveProgressStateForPartition(any(LeaderPartition.class), any(Duration.class));
}
}
Loading