Skip to content

Commit 4ec2a7c

Browse files
authored
Fix bug that would cause ddb stream processing to start before export completed (opensearch-project#6892)
Signed-off-by: Taylor Gray <tylgry@amazon.com>
1 parent 6769025 commit 4ec2a7c

2 files changed

Lines changed: 8 additions & 1 deletion

File tree

data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ private void createDataFilePartitions(final String exportArn,
197197

198198
// Currently, we need to maintain a global state to track the overall progress.
199199
// So that we can easily tell if all the export files are loaded
200-
LoadStatus loadStatus = new LoadStatus(totalFiles.get(), 0, totalRecords.get(), 0);
200+
LoadStatus loadStatus = new LoadStatus(dataFileInfo.size(), 0, totalRecords.get(), 0);
201201
enhancedSourceCoordinator.createPartition(new GlobalState(exportArn, Optional.of(loadStatus.toMap())));
202202

203203
dataFileInfo.forEach((key, size) -> {

data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportSchedulerTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.GlobalState;
2121
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.ExportProgressState;
2222
import org.opensearch.dataprepper.plugins.source.dynamodb.model.ExportSummary;
23+
import org.opensearch.dataprepper.plugins.source.dynamodb.model.LoadStatus;
2324
import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics;
2425
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
2526
import software.amazon.awssdk.services.dynamodb.model.DescribeExportRequest;
@@ -185,6 +186,12 @@ public void test_run_exportJob_correctly() throws InterruptedException {
185186
assertThat(createdPartitions.get(1), instanceOf(DataFilePartition.class));
186187
assertThat(createdPartitions.get(2), instanceOf(DataFilePartition.class));
187188

189+
// Verify the GlobalState LoadStatus has accurate totalFiles count
190+
GlobalState globalState = (GlobalState) createdPartitions.get(0);
191+
LoadStatus loadStatus = LoadStatus.fromMap(globalState.getProgressState().get());
192+
assertThat(loadStatus.getTotalFiles(), equalTo(2));
193+
assertThat(loadStatus.getLoadedFiles(), equalTo(0));
194+
188195
// Complete the export partition
189196
verify(coordinator).completePartition(any(EnhancedSourcePartition.class));
190197
verify(exportJobSuccess).increment();

0 commit comments

Comments
 (0)