Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,12 @@ private void createDataFilePartitions(final String exportArn,
LOG.info("Total of {} data files generated for export {}", dataFileInfo.size(), exportArn);
AtomicInteger totalRecords = new AtomicInteger();
AtomicInteger totalFiles = new AtomicInteger();

// Currently, we need to maintain a global state to track the overall progress.
// So that we can easily tell if all the export files are loaded
LoadStatus loadStatus = new LoadStatus(totalFiles.get(), 0, totalRecords.get(), 0);
enhancedSourceCoordinator.createPartition(new GlobalState(exportArn, Optional.of(loadStatus.toMap())));

dataFileInfo.forEach((key, size) -> {
DataFileProgressState progressState = new DataFileProgressState();
progressState.setTotal(size);
Expand All @@ -208,11 +214,6 @@ private void createDataFilePartitions(final String exportArn,

exportS3ObjectsTotalCounter.increment(totalFiles.get());
exportRecordsTotalCounter.increment(totalRecords.get());

// Currently, we need to maintain a global state to track the overall progress.
// So that we can easily tell if all the export files are loaded
LoadStatus loadStatus = new LoadStatus(totalFiles.get(), 0, totalRecords.get(), 0);
enhancedSourceCoordinator.createPartition(new GlobalState(exportArn, Optional.of(loadStatus.toMap())));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.DataFilePartition;
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.ExportPartition;
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.GlobalState;
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.ExportProgressState;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.ExportSummary;
import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics;
Expand All @@ -30,6 +32,7 @@
import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException;

import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
Expand All @@ -40,6 +43,7 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.BDDMockito.given;
Expand Down Expand Up @@ -171,7 +175,16 @@ public void test_run_exportJob_correctly() throws InterruptedException {
verify(dynamoDBClient, times(2)).describeExport(any(DescribeExportRequest.class));

// Create 2 data file partitions + 1 global state
verify(coordinator, times(3)).createPartition(any(EnhancedSourcePartition.class));
final ArgumentCaptor<EnhancedSourcePartition> argumentCaptorPartitionsCreated = ArgumentCaptor.forClass(EnhancedSourcePartition.class);

verify(coordinator, times(3)).createPartition(argumentCaptorPartitionsCreated.capture());

final List<EnhancedSourcePartition> createdPartitions = argumentCaptorPartitionsCreated.getAllValues();
assertThat(createdPartitions.size(), equalTo(3));
assertThat(createdPartitions.get(0), instanceOf(GlobalState.class));
assertThat(createdPartitions.get(1), instanceOf(DataFilePartition.class));
assertThat(createdPartitions.get(2), instanceOf(DataFilePartition.class));

// Complete the export partition
verify(coordinator).completePartition(any(EnhancedSourcePartition.class));
verify(exportJobSuccess).increment();
Expand Down
Loading