Skip to content

Commit 1fac731

Browse files
committed
Implement for leader scheduler in mongodb source
Signed-off-by: Taylor Gray <tylgry@amazon.com>
1 parent c4fce8a commit 1fac731

2 files changed

Lines changed: 38 additions & 1 deletion

File tree

data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderScheduler.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,12 @@ public void run() {
9999
if(leaderPartition != null) {
100100
// Extend the timeout
101101
// will always be a leader until shutdown
102-
coordinator.saveProgressStateForPartition(leaderPartition, Duration.ofMinutes(DEFAULT_EXTEND_LEASE_MINUTES));
102+
try {
103+
coordinator.saveProgressStateForPartition(leaderPartition, Duration.ofMinutes(DEFAULT_EXTEND_LEASE_MINUTES));
104+
} catch (Exception ex) {
105+
LOG.error("Failed to update ownership for leader partition. Will attempt to reacquire this partition...");
106+
leaderPartition = null;
107+
}
103108
}
104109
try {
105110
Thread.sleep(leaseInterval.toMillis());

data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderSchedulerTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,11 @@
3333
import static org.hamcrest.Matchers.instanceOf;
3434
import static org.hamcrest.Matchers.is;
3535
import static org.hamcrest.Matchers.startsWith;
36+
import static org.junit.jupiter.api.Assertions.assertFalse;
3637
import static org.junit.jupiter.api.Assertions.assertThrows;
3738
import static org.mockito.ArgumentMatchers.eq;
3839
import static org.mockito.BDDMockito.given;
40+
import static org.mockito.Mockito.doThrow;
3941
import static org.mockito.Mockito.atLeast;
4042
import static org.mockito.Mockito.times;
4143
import static org.mockito.Mockito.verify;
@@ -290,4 +292,34 @@ void test_shouldInitStream_withEmptyS3PathPrefix() {
290292
void test_shouldInitStream_withNullS3PathPrefix() {
291293
assertThrows(IllegalArgumentException.class, () -> new LeaderScheduler(coordinator, mongoDBSourceConfig, null, Duration.ofMillis(100)));
292294
}
295+
296+
@Test
297+
void test_saveProgressStateThrowsException_schedulerContinuesAndReacquiresLeader() {
298+
given(mongoDBSourceConfig.getCollections()).willReturn(List.of());
299+
leaderScheduler = new LeaderScheduler(coordinator, mongoDBSourceConfig, TEST_S3_PATH_PREFIX, Duration.ofMillis(100));
300+
leaderPartition = new LeaderPartition();
301+
given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.of(leaderPartition));
302+
doThrow(new RuntimeException("saveProgressState failed"))
303+
.when(coordinator).saveProgressStateForPartition(eq(leaderPartition), eq(Duration.ofMinutes(DEFAULT_EXTEND_LEASE_MINUTES)));
304+
305+
final ExecutorService executorService = Executors.newSingleThreadExecutor();
306+
final Future<?> future = executorService.submit(() -> leaderScheduler.run());
307+
308+
// Verify saveProgressStateForPartition was called (and threw internally)
309+
await()
310+
.atMost(Duration.ofSeconds(2))
311+
.untilAsserted(() -> verify(coordinator, atLeast(1)).saveProgressStateForPartition(leaderPartition, Duration.ofMinutes(DEFAULT_EXTEND_LEASE_MINUTES)));
312+
313+
// Verify the scheduler reacquires the leader after failure —
314+
// leaderPartition was reset to null, so acquireAvailablePartition is called again
315+
await()
316+
.atMost(Duration.ofSeconds(2))
317+
.untilAsserted(() -> verify(coordinator, atLeast(2)).acquireAvailablePartition(LeaderPartition.PARTITION_TYPE));
318+
319+
// Scheduler is still running — didn't crash
320+
assertFalse(future.isDone());
321+
322+
future.cancel(true);
323+
executorService.shutdownNow();
324+
}
293325
}

0 commit comments

Comments
 (0)