Skip to content

Commit cb6ddcf

Browse files
author
Brendan Benner
committed
Stage LeaderPartition & LeaderSchedulerTest
1 parent 2b255b4 commit cb6ddcf

2 files changed

Lines changed: 83 additions & 2 deletions

File tree

  • data-prepper-plugins/saas-source-plugins/source-crawler/src
    • main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/partition
    • test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler

data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/partition/LeaderPartition.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition;
22

3+
import org.opensearch.dataprepper.plugins.source.source_crawler.base.TokenLeaderProgressState;
34
import com.fasterxml.jackson.core.JsonFactory;
45
import com.fasterxml.jackson.core.JsonProcessingException;
56
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -56,7 +57,22 @@ public Optional<LeaderProgressState> getProgressState() {
5657
}
5758

5859
public void setLeaderProgressState(LeaderProgressState state) {
59-
this.state.setLastPollTime(state.getLastPollTime());
60+
boolean stateIsToken = state instanceof TokenLeaderProgressState;
61+
boolean thisStateIsToken = this.state instanceof TokenLeaderProgressState;
62+
63+
if (stateIsToken != thisStateIsToken) {
64+
// Validate that the states are not inconsistent
65+
// We don't expect to reach here.
66+
throw new RuntimeException("Leader partition progress state type mismatch: " +
67+
"Provided state type: " + state.getClass().getSimpleName() +
68+
", Current state type: " + this.state.getClass().getSimpleName());
69+
}
70+
71+
if (state instanceof TokenLeaderProgressState) {
72+
((TokenLeaderProgressState) this.state).setLastToken(((TokenLeaderProgressState) state).getLastToken());
73+
} else {
74+
this.state.setLastPollTime(state.getLastPollTime());
75+
}
6076
}
6177

6278
/**
@@ -77,4 +93,4 @@ public LeaderProgressState convertToPartitionState(final String serializedPartit
7793
return null;
7894
}
7995
}
80-
}
96+
}

data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.opensearch.dataprepper.plugins.source.source_crawler.base.Crawler;
1111
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.LeaderPartition;
1212
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.PaginationCrawlerLeaderProgressState;
13+
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.TokenPaginationCrawlerLeaderProgressState;
1314

1415
import java.time.Duration;
1516
import java.time.Instant;
@@ -26,10 +27,13 @@
2627
import static org.mockito.Mockito.verify;
2728
import static org.mockito.Mockito.verifyNoInteractions;
2829
import static org.mockito.Mockito.when;
30+
import static org.junit.jupiter.api.Assertions.assertThrows;
2931

3032
@ExtendWith(MockitoExtension.class)
3133
public class LeaderSchedulerTest {
3234

35+
private static String LAST_TOKEN = "sample-token-123";
36+
3337
@Mock
3438
private EnhancedSourceCoordinator coordinator;
3539
@Mock
@@ -47,6 +51,29 @@ void testUnableToAcquireLeaderPartition() throws InterruptedException {
4751
verifyNoInteractions(crawler);
4852
}
4953

54+
@ParameterizedTest
55+
@ValueSource(booleans = {true, false})
56+
void testTokenPaginationCrawlerLeaderPartitionsCreation(boolean initializationState) throws InterruptedException {
57+
LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, crawler);
58+
LeaderPartition leaderPartition = new LeaderPartition(new TokenPaginationCrawlerLeaderProgressState(""));
59+
TokenPaginationCrawlerLeaderProgressState state = (TokenPaginationCrawlerLeaderProgressState) leaderPartition.getProgressState().get();
60+
state.setInitialized(initializationState);
61+
state.setLastToken(LAST_TOKEN);
62+
given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.of(leaderPartition));
63+
doThrow(RuntimeException.class).when(coordinator).saveProgressStateForPartition(any(LeaderPartition.class), any(Duration.class));
64+
65+
ExecutorService executorService = Executors.newSingleThreadExecutor();
66+
executorService.submit(leaderScheduler);
67+
68+
Thread.sleep(100);
69+
executorService.shutdownNow();
70+
71+
// Check if crawler was invoked and updated leader lease renewal time
72+
verify(crawler, times(1)).crawl(leaderPartition, coordinator);
73+
verify(coordinator, times(1)).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class));
74+
75+
}
76+
5077
@ParameterizedTest
5178
@ValueSource(booleans = {true, false})
5279
void testPaginationCrawlerLeaderPartitionsCreation(boolean initializationState) throws InterruptedException {
@@ -90,6 +117,30 @@ void testExceptionWhileAcquiringLeaderPartition(boolean initializationState) thr
90117
verifyNoInteractions(crawler);
91118
}
92119

120+
@Test
121+
void testTokenPaginationCrawlerWhileLoopRunnningAfterTheSleep() throws InterruptedException {
122+
LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, crawler);
123+
leaderScheduler.setLeaseInterval(Duration.ofMillis(10));
124+
LeaderPartition leaderPartition = new LeaderPartition(new TokenPaginationCrawlerLeaderProgressState(""));
125+
TokenPaginationCrawlerLeaderProgressState state = (TokenPaginationCrawlerLeaderProgressState) leaderPartition.getProgressState().get();
126+
state.setInitialized(false);
127+
state.setLastToken(LAST_TOKEN);
128+
when(crawler.crawl(any(LeaderPartition.class), any(EnhancedSourceCoordinator.class))).thenReturn(Instant.ofEpochMilli(10));
129+
when(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE))
130+
.thenReturn(Optional.of(leaderPartition))
131+
.thenThrow(RuntimeException.class);
132+
133+
ExecutorService executorService = Executors.newSingleThreadExecutor();
134+
executorService.submit(leaderScheduler);
135+
136+
//Wait for more than a minute as the default while loop wait time in leader scheduler is 1 minute
137+
Thread.sleep(100);
138+
executorService.shutdownNow();
139+
140+
// Check if crawler was invoked and updated leader lease renewal time
141+
verify(crawler, atLeast(2)).crawl(any(LeaderPartition.class), any(EnhancedSourceCoordinator.class));
142+
}
143+
93144
@Test
94145
void testWhileLoopRunnningAfterTheSleep() throws InterruptedException {
95146
LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, crawler);
@@ -113,4 +164,18 @@ void testWhileLoopRunnningAfterTheSleep() throws InterruptedException {
113164
// Check if crawler was invoked and updated leader lease renewal time
114165
verify(crawler, atLeast(2)).crawl(any(LeaderPartition.class), any(EnhancedSourceCoordinator.class));
115166
}
167+
168+
@Test
169+
void testSetLeaderProgressState_throwsExceptionOnStateMismatch() {
170+
// Create a LeaderPartition with TokenPaginationCrawlerLeaderProgressState
171+
LeaderPartition leaderPartition = new LeaderPartition(new TokenPaginationCrawlerLeaderProgressState(""));
172+
173+
// Try to set a different type of state (PaginationCrawlerLeaderProgressState)
174+
PaginationCrawlerLeaderProgressState incompatibleState = new PaginationCrawlerLeaderProgressState(Instant.now());
175+
176+
// Verify that RuntimeException is thrown due to state type mismatch
177+
RuntimeException exception = assertThrows(RuntimeException.class, () -> {
178+
leaderPartition.setLeaderProgressState(incompatibleState);
179+
});
180+
}
116181
}

0 commit comments

Comments
 (0)