Skip to content

Commit 359b7d0

Browse files
committed
Renew the unit test to handle the two scans and seperate buckets
Signed-off-by: Siqi Ding <dingdd@amazon.com>
1 parent 107e594 commit 359b7d0

1 file changed

Lines changed: 31 additions & 9 deletions

File tree

data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -611,16 +611,14 @@ void object_is_not_filtered_out_based_on_last_modified_timestamp_when_delete_obj
611611
final ArgumentCaptor<ListObjectsV2Request> listObjectsV2RequestArgumentCaptor = ArgumentCaptor.forClass(ListObjectsV2Request.class);
612612
given(s3Client.listObjectsV2(listObjectsV2RequestArgumentCaptor.capture())).willReturn(listObjectsResponse);
613613

614+
final ArgumentCaptor<List<PartitionIdentifier>> createPartitionsArgumentCaptor = ArgumentCaptor.forClass(List.class);
615+
doNothing().when(sourceCoordinator).createPartitions(createPartitionsArgumentCaptor.capture());
616+
614617
final Map<String, Object> globalStateMap = new HashMap<>();
615618

616619
final Instant beforeFirstScan = Instant.now();
617620
final List<PartitionIdentifier> resultingPartitions = partitionCreationSupplier.apply(globalStateMap);
618-
619-
assertThat(resultingPartitions, notNullValue());
620-
assertThat(resultingPartitions.size(), equalTo(expectedPartitionIdentifiers.size()));
621-
assertThat(resultingPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()),
622-
containsInAnyOrder(expectedPartitionIdentifiers.stream().map(PartitionIdentifier::getPartitionKey)
623-
.map(Matchers::equalTo).collect(Collectors.toList())));
621+
assertThat(resultingPartitions.isEmpty(), equalTo(true));
624622

625623
assertThat(globalStateMap, notNullValue());
626624
assertThat(globalStateMap.containsKey(SCAN_COUNT), equalTo(true));
@@ -634,9 +632,33 @@ void object_is_not_filtered_out_based_on_last_modified_timestamp_when_delete_obj
634632

635633
final Instant beforeSecondScan = Instant.now();
636634
final List<PartitionIdentifier> secondScanPartitions = partitionCreationSupplier.apply(globalStateMap);
637-
assertThat(secondScanPartitions.size(), equalTo(expectedPartitionIdentifiersSecondScan.size()));
638-
assertThat(secondScanPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()),
639-
containsInAnyOrder(expectedPartitionIdentifiersSecondScan.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList())));
635+
assertThat(secondScanPartitions.isEmpty(), equalTo(true));
636+
final List<List<PartitionIdentifier>> createdPartitions = createPartitionsArgumentCaptor.getAllValues();
637+
assertThat(createdPartitions.size(), equalTo(4));
638+
639+
final List<PartitionIdentifier> firstScanFirstBucketPartitions = createdPartitions.get(0);
640+
final List<PartitionIdentifier> firstScanSecondBucketPartitions = createdPartitions.get(1);
641+
642+
final List<PartitionIdentifier> allFirstScanPartitions = new ArrayList<>();
643+
allFirstScanPartitions.addAll(firstScanFirstBucketPartitions);
644+
allFirstScanPartitions.addAll(firstScanSecondBucketPartitions);
645+
646+
assertThat(allFirstScanPartitions.size(), equalTo(expectedPartitionIdentifiers.size()));
647+
assertThat(allFirstScanPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()),
648+
containsInAnyOrder(expectedPartitionIdentifiers.stream().map(PartitionIdentifier::getPartitionKey)
649+
.map(Matchers::equalTo).collect(Collectors.toList())));
650+
651+
final List<PartitionIdentifier> secondScanFirstBucketPartitions = createdPartitions.get(2);
652+
final List<PartitionIdentifier> secondScanSecondBucketPartitions = createdPartitions.get(3);
653+
654+
final List<PartitionIdentifier> allSecondScanPartitions = new ArrayList<>();
655+
allSecondScanPartitions.addAll(secondScanFirstBucketPartitions);
656+
allSecondScanPartitions.addAll(secondScanSecondBucketPartitions);
657+
658+
assertThat(allSecondScanPartitions.size(), equalTo(expectedPartitionIdentifiersSecondScan.size()));
659+
assertThat(allSecondScanPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()),
660+
containsInAnyOrder(expectedPartitionIdentifiersSecondScan.stream().map(PartitionIdentifier::getPartitionKey)
661+
.map(Matchers::equalTo).collect(Collectors.toList())));
640662

641663
assertThat(globalStateMap, notNullValue());
642664
assertThat(globalStateMap.containsKey(SCAN_COUNT), equalTo(true));

0 commit comments

Comments
 (0)