Skip to content

Commit c4fce8a

Browse files
committed
Fix where stream scheduler could die from unexpected error for mongodb source
Signed-off-by: Taylor Gray <tylgry@amazon.com>
1 parent 580b6ef commit c4fce8a

2 files changed

Lines changed: 43 additions & 1 deletion

File tree

data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamScheduler.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,13 @@ public void run() {
9393
if (sourceConfig.isDisableS3ReadForLeader()) {
9494
System.clearProperty(STOP_S3_SCAN_PROCESSING_PROPERTY);
9595
}
96-
sourceCoordinator.giveUpPartition(streamPartition);
96+
try {
97+
sourceCoordinator.giveUpPartition(streamPartition);
98+
} catch (final Exception ex) {
99+
LOG.error("Failed to give up stream partition. Will attempt to reacquire this partition...");
100+
streamPartition = null;
101+
}
102+
97103
}
98104

99105
try {

data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamSchedulerTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@
2828
import java.util.concurrent.Future;
2929

3030
import static org.awaitility.Awaitility.await;
31+
import static org.junit.jupiter.api.Assertions.assertFalse;
3132
import static org.junit.jupiter.api.Assertions.assertThrows;
3233
import static org.mockito.ArgumentMatchers.any;
3334
import static org.mockito.ArgumentMatchers.eq;
3435
import static org.mockito.BDDMockito.given;
36+
import static org.mockito.Mockito.doThrow;
3537
import static org.mockito.Mockito.lenient;
3638
import static org.mockito.Mockito.mockStatic;
3739
import static org.mockito.Mockito.never;
@@ -150,6 +152,40 @@ void test_stream_runThrowsException() {
150152

151153
}
152154

155+
@Test
156+
void test_stream_giveUpPartitionThrowsException_schedulerContinuesRunning() {
157+
final String collection = UUID.randomUUID().toString();
158+
final StreamPartition streamPartition = new StreamPartition(collection, null);
159+
given(sourceCoordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE)).willReturn(Optional.of(streamPartition));
160+
given(collectionConfig.getCollection()).willReturn(collection);
161+
final int streamBatchSize = 1000;
162+
given(collectionConfig.getStreamBatchSize()).willReturn(streamBatchSize);
163+
doThrow(new RuntimeException("giveUpPartition failed"))
164+
.when(sourceCoordinator).giveUpPartition(any(StreamPartition.class));
165+
166+
final ExecutorService executorService = Executors.newSingleThreadExecutor();
167+
final Future<?> future = executorService.submit(() -> {
168+
try (MockedStatic<StreamWorker> streamWorkerMockedStatic = mockStatic(StreamWorker.class)) {
169+
streamWorkerMockedStatic.when(() -> StreamWorker.create(any(RecordBufferWriter.class), any(PartitionKeyRecordConverter.class), eq(sourceConfig),
170+
any(StreamAcknowledgementManager.class), any(DataStreamPartitionCheckpoint.class), eq(pluginMetrics), eq(DEFAULT_RECORD_FLUSH_BATCH_SIZE),
171+
eq(DEFAULT_CHECKPOINT_INTERVAL_MILLS), eq(DEFAULT_BUFFER_WRITE_INTERVAL_MILLS), eq(streamBatchSize), any(DocumentDBSourceAggregateMetrics.class)))
172+
.thenThrow(RuntimeException.class);
173+
streamScheduler.run();
174+
}
175+
});
176+
177+
await()
178+
.atMost(Duration.ofSeconds(5))
179+
.untilAsserted(() -> verify(sourceCoordinator).giveUpPartition(streamPartition));
180+
181+
// Scheduler is still running — didn't crash from the giveUpPartition exception.
182+
// streamPartition is reset to null, enabling clean reacquisition on the next iteration.
183+
assertFalse(future.isDone());
184+
185+
future.cancel(true);
186+
executorService.shutdownNow();
187+
}
188+
153189
@Test
154190
void test_stream_sourceCoordinatorThrowsException() {
155191
final StreamPartition streamPartition = new StreamPartition(UUID.randomUUID().toString(), null);

0 commit comments

Comments
 (0)