Skip to content

Commit 610e8e8

Browse files
authored
Fix race condition between export partition creation and data file partition completion for ddb source (#6651)
Signed-off-by: Taylor Gray <tylgry@amazon.com>
1 parent 363af8f commit 610e8e8

2 files changed

Lines changed: 20 additions & 6 deletions

File tree

data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,12 @@ private void createDataFilePartitions(final String exportArn,
194194
LOG.info("Total of {} data files generated for export {}", dataFileInfo.size(), exportArn);
195195
AtomicInteger totalRecords = new AtomicInteger();
196196
AtomicInteger totalFiles = new AtomicInteger();
197+
198+
// Currently, we need to maintain a global state to track the overall progress.
199+
// So that we can easily tell if all the export files are loaded
200+
LoadStatus loadStatus = new LoadStatus(totalFiles.get(), 0, totalRecords.get(), 0);
201+
enhancedSourceCoordinator.createPartition(new GlobalState(exportArn, Optional.of(loadStatus.toMap())));
202+
197203
dataFileInfo.forEach((key, size) -> {
198204
DataFileProgressState progressState = new DataFileProgressState();
199205
progressState.setTotal(size);
@@ -208,11 +214,6 @@ private void createDataFilePartitions(final String exportArn,
208214

209215
exportS3ObjectsTotalCounter.increment(totalFiles.get());
210216
exportRecordsTotalCounter.increment(totalRecords.get());
211-
212-
// Currently, we need to maintain a global state to track the overall progress.
213-
// So that we can easily tell if all the export files are loaded
214-
LoadStatus loadStatus = new LoadStatus(totalFiles.get(), 0, totalRecords.get(), 0);
215-
enhancedSourceCoordinator.createPartition(new GlobalState(exportArn, Optional.of(loadStatus.toMap())));
216217
}
217218

218219

data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportSchedulerTest.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
import org.opensearch.dataprepper.metrics.PluginMetrics;
1616
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
1717
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
18+
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.DataFilePartition;
1819
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.ExportPartition;
20+
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.GlobalState;
1921
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.ExportProgressState;
2022
import org.opensearch.dataprepper.plugins.source.dynamodb.model.ExportSummary;
2123
import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics;
@@ -30,6 +32,7 @@
3032
import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException;
3133

3234
import java.time.Instant;
35+
import java.util.List;
3336
import java.util.Map;
3437
import java.util.Optional;
3538
import java.util.UUID;
@@ -40,6 +43,7 @@
4043

4144
import static org.hamcrest.MatcherAssert.assertThat;
4245
import static org.hamcrest.Matchers.equalTo;
46+
import static org.hamcrest.Matchers.instanceOf;
4347
import static org.mockito.ArgumentMatchers.any;
4448
import static org.mockito.ArgumentMatchers.anyString;
4549
import static org.mockito.BDDMockito.given;
@@ -171,7 +175,16 @@ public void test_run_exportJob_correctly() throws InterruptedException {
171175
verify(dynamoDBClient, times(2)).describeExport(any(DescribeExportRequest.class));
172176

173177
// Create 2 data file partitions + 1 global state
174-
verify(coordinator, times(3)).createPartition(any(EnhancedSourcePartition.class));
178+
final ArgumentCaptor<EnhancedSourcePartition> argumentCaptorPartitionsCreated = ArgumentCaptor.forClass(EnhancedSourcePartition.class);
179+
180+
verify(coordinator, times(3)).createPartition(argumentCaptorPartitionsCreated.capture());
181+
182+
final List<EnhancedSourcePartition> createdPartitions = argumentCaptorPartitionsCreated.getAllValues();
183+
assertThat(createdPartitions.size(), equalTo(3));
184+
assertThat(createdPartitions.get(0), instanceOf(GlobalState.class));
185+
assertThat(createdPartitions.get(1), instanceOf(DataFilePartition.class));
186+
assertThat(createdPartitions.get(2), instanceOf(DataFilePartition.class));
187+
175188
// Complete the export partition
176189
verify(coordinator).completePartition(any(EnhancedSourcePartition.class));
177190
verify(exportJobSuccess).increment();

0 commit comments

Comments
 (0)