Skip to content

Commit bfda518

Browse files
authored
Fix possible missing file count in data file loader (#6639)
Signed-off-by: Hai Yan <oeyh@amazon.com>
1 parent 2c95e70 commit bfda518

2 files changed

Lines changed: 85 additions & 9 deletions

File tree

data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileScheduler.java

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,11 @@ public class DataFileScheduler implements Runnable {
5151

5252
private static final Duration DEFAULT_UPDATE_LOAD_STATUS_TIMEOUT = Duration.ofMinutes(30);
5353

54-
54+
static final int CREATE_PARTITION_MAX_RETRIES = 3;
5555
static final String EXPORT_S3_OBJECTS_PROCESSED_COUNT = "exportS3ObjectsProcessed";
5656
static final String EXPORT_S3_OBJECTS_ERROR_COUNT = "exportS3ObjectsErrors";
5757
static final String ACTIVE_EXPORT_S3_OBJECT_CONSUMERS_GAUGE = "activeExportS3ObjectConsumers";
5858

59-
6059
private final EnhancedSourceCoordinator sourceCoordinator;
6160
private final ExecutorService executor;
6261
private final RdsSourceConfig sourceConfig;
@@ -176,35 +175,47 @@ private void processDataFilePartition(DataFilePartition dataFilePartition) {
176175
numOfWorkers.incrementAndGet();
177176
}
178177

179-
private void updateLoadStatus(String exportTaskId, Duration timeout) {
178+
void updateLoadStatus(String exportTaskId, Duration timeout) {
180179

181180
Instant endTime = Instant.now().plus(timeout);
181+
LoadStatus loadStatus = null;
182+
boolean savedSuccessfully = false;
182183
// Keep retrying in case update fails due to conflicts until timed out
183184
while (Instant.now().isBefore(endTime)) {
184185
Optional<EnhancedSourcePartition> globalStatePartition = sourceCoordinator.getPartition(exportTaskId);
185186
if (globalStatePartition.isEmpty()) {
186187
LOG.error("Failed to get data file load status for {}", exportTaskId);
187-
return;
188+
// Transient read failures should be retried
189+
continue;
188190
}
189191

190192
GlobalState globalState = (GlobalState) globalStatePartition.get();
191-
LoadStatus loadStatus = LoadStatus.fromMap(globalState.getProgressState().get());
193+
loadStatus = LoadStatus.fromMap(globalState.getProgressState().get());
192194
loadStatus.setLoadedFiles(loadStatus.getLoadedFiles() + 1);
193195
LOG.info("Current data file load status: total {} loaded {}", loadStatus.getTotalFiles(), loadStatus.getLoadedFiles());
194196

195197
globalState.setProgressState(loadStatus.toMap());
196198

197199
try {
198200
sourceCoordinator.saveProgressStateForPartition(globalState, null);
199-
if (sourceConfig.isStreamEnabled() && loadStatus.getLoadedFiles() == loadStatus.getTotalFiles()) {
200-
LOG.info("All exports are done, streaming can continue...");
201-
sourceCoordinator.createPartition(new GlobalState("stream-for-" + sourceConfig.getDbIdentifier(), null));
202-
}
201+
savedSuccessfully = true;
203202
break;
204203
} catch (Exception e) {
205204
LOG.error("Failed to update the global status, looks like the status was out of date, will retry..");
206205
}
207206
}
207+
208+
if (savedSuccessfully && sourceConfig.isStreamEnabled() && loadStatus.getLoadedFiles() == loadStatus.getTotalFiles()) {
209+
LOG.info("All exports are done, streaming can continue...");
210+
for (int attempt = 0; attempt < CREATE_PARTITION_MAX_RETRIES; attempt++) {
211+
try {
212+
sourceCoordinator.createPartition(new GlobalState("stream-for-" + sourceConfig.getDbIdentifier(), null));
213+
break;
214+
} catch (Exception e) {
215+
LOG.error("Failed to create stream trigger partition for {} (attempt {}), will retry", sourceConfig.getDbIdentifier(), attempt + 1, e);
216+
}
217+
}
218+
}
208219
}
209220

210221
private BiConsumer<Void, Throwable> completeDataLoader(DataFilePartition dataFilePartition) {

data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileSchedulerTest.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,13 @@
4343
import static org.awaitility.Awaitility.await;
4444
import static org.mockito.ArgumentMatchers.any;
4545
import static org.mockito.ArgumentMatchers.eq;
46+
import static org.mockito.ArgumentMatchers.isNull;
4647
import static org.mockito.Mockito.doNothing;
4748
import static org.mockito.Mockito.doThrow;
4849
import static org.mockito.Mockito.mock;
4950
import static org.mockito.Mockito.mockStatic;
5051
import static org.mockito.Mockito.never;
52+
import static org.mockito.Mockito.times;
5153
import static org.mockito.Mockito.verify;
5254
import static org.mockito.Mockito.verifyNoInteractions;
5355
import static org.mockito.Mockito.verifyNoMoreInteractions;
@@ -187,6 +189,69 @@ void test_data_file_loader_throws_exception_then_give_up_partition() {
187189
verify(sourceCoordinator).giveUpPartition(dataFilePartition);
188190
}
189191

192+
@Test
193+
void test_when_getPartition_initially_returns_empty_then_retries_and_updates_load_status() {
194+
final String exportTaskId = UUID.randomUUID().toString();
195+
196+
final GlobalState loadStatusGlobalState = mock(GlobalState.class);
197+
final int totalFiles = 5;
198+
final Map<String, Object> loadStatusMap = new LoadStatus(totalFiles, totalFiles - 2).toMap();
199+
when(loadStatusGlobalState.getProgressState()).thenReturn(Optional.of(loadStatusMap));
200+
201+
// First call returns empty (transient failure), second call returns valid state
202+
when(sourceCoordinator.getPartition(exportTaskId))
203+
.thenReturn(Optional.empty())
204+
.thenReturn(Optional.of(loadStatusGlobalState));
205+
206+
createObjectUnderTest().updateLoadStatus(exportTaskId, Duration.ofSeconds(2));
207+
208+
verify(sourceCoordinator).saveProgressStateForPartition(any(GlobalState.class), isNull());
209+
}
210+
211+
@Test
212+
void test_when_last_file_loaded_with_stream_enabled_then_creates_stream_trigger_partition() {
213+
final String exportTaskId = UUID.randomUUID().toString();
214+
final String dbIdentifier = UUID.randomUUID().toString();
215+
when(sourceConfig.isStreamEnabled()).thenReturn(true);
216+
when(sourceConfig.getDbIdentifier()).thenReturn(dbIdentifier);
217+
218+
final GlobalState loadStatusGlobalState = mock(GlobalState.class);
219+
final int totalFiles = 3;
220+
final Map<String, Object> loadStatusMap = new LoadStatus(totalFiles, totalFiles - 1).toMap();
221+
when(loadStatusGlobalState.getProgressState()).thenReturn(Optional.of(loadStatusMap));
222+
when(sourceCoordinator.getPartition(exportTaskId)).thenReturn(Optional.of(loadStatusGlobalState));
223+
224+
createObjectUnderTest().updateLoadStatus(exportTaskId, Duration.ofSeconds(2));
225+
226+
verify(sourceCoordinator).createPartition(any(GlobalState.class));
227+
}
228+
229+
@Test
230+
void test_when_createPartition_throws_transiently_then_retries_and_partition_is_completed() {
231+
final String exportTaskId = UUID.randomUUID().toString();
232+
final String dbIdentifier = UUID.randomUUID().toString();
233+
when(sourceConfig.isStreamEnabled()).thenReturn(true);
234+
when(sourceConfig.getDbIdentifier()).thenReturn(dbIdentifier);
235+
236+
final GlobalState loadStatusGlobalState = mock(GlobalState.class);
237+
final int totalFiles = 3;
238+
final Map<String, Object> loadStatusMap = new LoadStatus(totalFiles, totalFiles - 1).toMap();
239+
when(loadStatusGlobalState.getProgressState()).thenReturn(Optional.of(loadStatusMap));
240+
when(sourceCoordinator.getPartition(exportTaskId)).thenReturn(Optional.of(loadStatusGlobalState));
241+
242+
// createPartition throws once then succeeds — verifies retry behavior
243+
when(sourceCoordinator.createPartition(any(GlobalState.class)))
244+
.thenThrow(new RuntimeException("DynamoDB error"))
245+
.thenReturn(true);
246+
247+
createObjectUnderTest().updateLoadStatus(exportTaskId, Duration.ofSeconds(2));
248+
249+
// saveProgressStateForPartition must be called exactly once — no retry/overshoot
250+
verify(sourceCoordinator, times(1)).saveProgressStateForPartition(any(GlobalState.class), isNull());
251+
// createPartition retried: first threw, second succeeded
252+
verify(sourceCoordinator, times(2)).createPartition(any(GlobalState.class));
253+
}
254+
190255
@Disabled("Flaky test, needs to be fixed")
191256
@Test
192257
void test_shutdown() throws InterruptedException {

0 commit comments

Comments
 (0)