Skip to content

Commit aaefdf3

Browse files
authored
Fix where stream and leader scheduler could die from unexpected error for mongodb source (#6638)
Signed-off-by: Taylor Gray <tylgry@amazon.com>
1 parent d5d255a commit aaefdf3

4 files changed

Lines changed: 81 additions & 2 deletions

File tree

data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderScheduler.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,12 @@ public void run() {
9999
if(leaderPartition != null) {
100100
// Extend the timeout
101101
// will always be a leader until shutdown
102-
coordinator.saveProgressStateForPartition(leaderPartition, Duration.ofMinutes(DEFAULT_EXTEND_LEASE_MINUTES));
102+
try {
103+
coordinator.saveProgressStateForPartition(leaderPartition, Duration.ofMinutes(DEFAULT_EXTEND_LEASE_MINUTES));
104+
} catch (Exception ex) {
105+
LOG.error("Failed to update ownership for leader partition. Will attempt to reacquire this partition...");
106+
leaderPartition = null;
107+
}
103108
}
104109
try {
105110
Thread.sleep(leaseInterval.toMillis());

data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamScheduler.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,13 @@ public void run() {
9393
if (sourceConfig.isDisableS3ReadForLeader()) {
9494
System.clearProperty(STOP_S3_SCAN_PROCESSING_PROPERTY);
9595
}
96-
sourceCoordinator.giveUpPartition(streamPartition);
96+
try {
97+
sourceCoordinator.giveUpPartition(streamPartition);
98+
} catch (final Exception ex) {
99+
LOG.error("Failed to give up stream partition. Will attempt to reacquire this partition...");
100+
streamPartition = null;
101+
}
102+
97103
}
98104

99105
try {

data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderSchedulerTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,11 @@
3333
import static org.hamcrest.Matchers.instanceOf;
3434
import static org.hamcrest.Matchers.is;
3535
import static org.hamcrest.Matchers.startsWith;
36+
import static org.junit.jupiter.api.Assertions.assertFalse;
3637
import static org.junit.jupiter.api.Assertions.assertThrows;
3738
import static org.mockito.ArgumentMatchers.eq;
3839
import static org.mockito.BDDMockito.given;
40+
import static org.mockito.Mockito.doThrow;
3941
import static org.mockito.Mockito.atLeast;
4042
import static org.mockito.Mockito.times;
4143
import static org.mockito.Mockito.verify;
@@ -290,4 +292,34 @@ void test_shouldInitStream_withEmptyS3PathPrefix() {
290292
void test_shouldInitStream_withNullS3PathPrefix() {
291293
assertThrows(IllegalArgumentException.class, () -> new LeaderScheduler(coordinator, mongoDBSourceConfig, null, Duration.ofMillis(100)));
292294
}
295+
296+
@Test
297+
void test_saveProgressStateThrowsException_schedulerContinuesAndReacquiresLeader() {
298+
given(mongoDBSourceConfig.getCollections()).willReturn(List.of());
299+
leaderScheduler = new LeaderScheduler(coordinator, mongoDBSourceConfig, TEST_S3_PATH_PREFIX, Duration.ofMillis(100));
300+
leaderPartition = new LeaderPartition();
301+
given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.of(leaderPartition));
302+
doThrow(new RuntimeException("saveProgressState failed"))
303+
.when(coordinator).saveProgressStateForPartition(eq(leaderPartition), eq(Duration.ofMinutes(DEFAULT_EXTEND_LEASE_MINUTES)));
304+
305+
final ExecutorService executorService = Executors.newSingleThreadExecutor();
306+
final Future<?> future = executorService.submit(() -> leaderScheduler.run());
307+
308+
// Verify saveProgressStateForPartition was called (and threw internally)
309+
await()
310+
.atMost(Duration.ofSeconds(2))
311+
.untilAsserted(() -> verify(coordinator, atLeast(1)).saveProgressStateForPartition(leaderPartition, Duration.ofMinutes(DEFAULT_EXTEND_LEASE_MINUTES)));
312+
313+
// Verify the scheduler reacquires the leader after failure —
314+
// leaderPartition was reset to null, so acquireAvailablePartition is called again
315+
await()
316+
.atMost(Duration.ofSeconds(2))
317+
.untilAsserted(() -> verify(coordinator, atLeast(2)).acquireAvailablePartition(LeaderPartition.PARTITION_TYPE));
318+
319+
// Scheduler is still running — didn't crash
320+
assertFalse(future.isDone());
321+
322+
future.cancel(true);
323+
executorService.shutdownNow();
324+
}
293325
}

data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamSchedulerTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@
2828
import java.util.concurrent.Future;
2929

3030
import static org.awaitility.Awaitility.await;
31+
import static org.junit.jupiter.api.Assertions.assertFalse;
3132
import static org.junit.jupiter.api.Assertions.assertThrows;
3233
import static org.mockito.ArgumentMatchers.any;
3334
import static org.mockito.ArgumentMatchers.eq;
3435
import static org.mockito.BDDMockito.given;
36+
import static org.mockito.Mockito.doThrow;
3537
import static org.mockito.Mockito.lenient;
3638
import static org.mockito.Mockito.mockStatic;
3739
import static org.mockito.Mockito.never;
@@ -150,6 +152,40 @@ void test_stream_runThrowsException() {
150152

151153
}
152154

155+
@Test
156+
void test_stream_giveUpPartitionThrowsException_schedulerContinuesRunning() {
157+
final String collection = UUID.randomUUID().toString();
158+
final StreamPartition streamPartition = new StreamPartition(collection, null);
159+
given(sourceCoordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE)).willReturn(Optional.of(streamPartition));
160+
given(collectionConfig.getCollection()).willReturn(collection);
161+
final int streamBatchSize = 1000;
162+
given(collectionConfig.getStreamBatchSize()).willReturn(streamBatchSize);
163+
doThrow(new RuntimeException("giveUpPartition failed"))
164+
.when(sourceCoordinator).giveUpPartition(any(StreamPartition.class));
165+
166+
final ExecutorService executorService = Executors.newSingleThreadExecutor();
167+
final Future<?> future = executorService.submit(() -> {
168+
try (MockedStatic<StreamWorker> streamWorkerMockedStatic = mockStatic(StreamWorker.class)) {
169+
streamWorkerMockedStatic.when(() -> StreamWorker.create(any(RecordBufferWriter.class), any(PartitionKeyRecordConverter.class), eq(sourceConfig),
170+
any(StreamAcknowledgementManager.class), any(DataStreamPartitionCheckpoint.class), eq(pluginMetrics), eq(DEFAULT_RECORD_FLUSH_BATCH_SIZE),
171+
eq(DEFAULT_CHECKPOINT_INTERVAL_MILLS), eq(DEFAULT_BUFFER_WRITE_INTERVAL_MILLS), eq(streamBatchSize), any(DocumentDBSourceAggregateMetrics.class)))
172+
.thenThrow(RuntimeException.class);
173+
streamScheduler.run();
174+
}
175+
});
176+
177+
await()
178+
.atMost(Duration.ofSeconds(5))
179+
.untilAsserted(() -> verify(sourceCoordinator).giveUpPartition(streamPartition));
180+
181+
// Scheduler is still running — didn't crash from the giveUpPartition exception.
182+
// streamPartition is reset to null, enabling clean reacquisition on the next iteration.
183+
assertFalse(future.isDone());
184+
185+
future.cancel(true);
186+
executorService.shutdownNow();
187+
}
188+
153189
@Test
154190
void test_stream_sourceCoordinatorThrowsException() {
155191
final StreamPartition streamPartition = new StreamPartition(UUID.randomUUID().toString(), null);

0 commit comments

Comments
 (0)