diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderScheduler.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderScheduler.java index c8b14a6992..bac98f4ecb 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderScheduler.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderScheduler.java @@ -99,7 +99,12 @@ public void run() { if(leaderPartition != null) { // Extend the timeout // will always be a leader until shutdown - coordinator.saveProgressStateForPartition(leaderPartition, Duration.ofMinutes(DEFAULT_EXTEND_LEASE_MINUTES)); + try { + coordinator.saveProgressStateForPartition(leaderPartition, Duration.ofMinutes(DEFAULT_EXTEND_LEASE_MINUTES)); + } catch (Exception ex) { + LOG.error("Failed to update ownership for leader partition. Will attempt to reacquire this partition..."); + leaderPartition = null; + } } try { Thread.sleep(leaseInterval.toMillis()); diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamScheduler.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamScheduler.java index 40a80bb71f..60bac24d18 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamScheduler.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamScheduler.java @@ -93,7 +93,13 @@ public void run() { if (sourceConfig.isDisableS3ReadForLeader()) { System.clearProperty(STOP_S3_SCAN_PROCESSING_PROPERTY); } - sourceCoordinator.giveUpPartition(streamPartition); + try { + sourceCoordinator.giveUpPartition(streamPartition); + } catch (final Exception ex) { + LOG.error("Failed to give up stream partition. Will attempt to reacquire this partition..."); + streamPartition = null; + } + } try { diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderSchedulerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderSchedulerTest.java index 41476ef8bd..6556c960ec 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderSchedulerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderSchedulerTest.java @@ -33,9 +33,11 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.startsWith; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -290,4 +292,34 @@ void test_shouldInitStream_withEmptyS3PathPrefix() { void test_shouldInitStream_withNullS3PathPrefix() { assertThrows(IllegalArgumentException.class, () -> new LeaderScheduler(coordinator, mongoDBSourceConfig, null, Duration.ofMillis(100))); } + + @Test + void test_saveProgressStateThrowsException_schedulerContinuesAndReacquiresLeader() { + given(mongoDBSourceConfig.getCollections()).willReturn(List.of()); + leaderScheduler = new LeaderScheduler(coordinator, mongoDBSourceConfig, TEST_S3_PATH_PREFIX, Duration.ofMillis(100)); + leaderPartition = new LeaderPartition(); + given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.of(leaderPartition)); + doThrow(new RuntimeException("saveProgressState failed")) + .when(coordinator).saveProgressStateForPartition(eq(leaderPartition), eq(Duration.ofMinutes(DEFAULT_EXTEND_LEASE_MINUTES))); + + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + final Future future = executorService.submit(() -> leaderScheduler.run()); + + // Verify saveProgressStateForPartition was called (and threw internally) + await() + .atMost(Duration.ofSeconds(2)) + .untilAsserted(() -> verify(coordinator, atLeast(1)).saveProgressStateForPartition(leaderPartition, Duration.ofMinutes(DEFAULT_EXTEND_LEASE_MINUTES))); + + // Verify the scheduler reacquires the leader after failure — + // leaderPartition was reset to null, so acquireAvailablePartition is called again + await() + .atMost(Duration.ofSeconds(2)) + .untilAsserted(() -> verify(coordinator, atLeast(2)).acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)); + + // Scheduler is still running — didn't crash + assertFalse(future.isDone()); + + future.cancel(true); + executorService.shutdownNow(); + } } diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamSchedulerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamSchedulerTest.java index 9973b2bc14..138cd4e7f7 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamSchedulerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamSchedulerTest.java @@ -28,10 +28,12 @@ import java.util.concurrent.Future; import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.never; @@ -150,6 +152,40 @@ void test_stream_runThrowsException() { } + @Test + void test_stream_giveUpPartitionThrowsException_schedulerContinuesRunning() { + final String collection = UUID.randomUUID().toString(); + final StreamPartition streamPartition = new StreamPartition(collection, null); + given(sourceCoordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE)).willReturn(Optional.of(streamPartition)); + given(collectionConfig.getCollection()).willReturn(collection); + final int streamBatchSize = 1000; + given(collectionConfig.getStreamBatchSize()).willReturn(streamBatchSize); + doThrow(new RuntimeException("giveUpPartition failed")) + .when(sourceCoordinator).giveUpPartition(any(StreamPartition.class)); + + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + final Future future = executorService.submit(() -> { + try (MockedStatic streamWorkerMockedStatic = mockStatic(StreamWorker.class)) { + streamWorkerMockedStatic.when(() -> StreamWorker.create(any(RecordBufferWriter.class), any(PartitionKeyRecordConverter.class), eq(sourceConfig), + any(StreamAcknowledgementManager.class), any(DataStreamPartitionCheckpoint.class), eq(pluginMetrics), eq(DEFAULT_RECORD_FLUSH_BATCH_SIZE), + eq(DEFAULT_CHECKPOINT_INTERVAL_MILLS), eq(DEFAULT_BUFFER_WRITE_INTERVAL_MILLS), eq(streamBatchSize), any(DocumentDBSourceAggregateMetrics.class))) + .thenThrow(RuntimeException.class); + streamScheduler.run(); + } + }); + + await() + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> verify(sourceCoordinator).giveUpPartition(streamPartition)); + + // Scheduler is still running — didn't crash from the giveUpPartition exception. + // streamPartition is reset to null, enabling clean reacquisition on the next iteration. + assertFalse(future.isDone()); + + future.cancel(true); + executorService.shutdownNow(); + } + @Test void test_stream_sourceCoordinatorThrowsException() { final StreamPartition streamPartition = new StreamPartition(UUID.randomUUID().toString(), null);