@@ -611,16 +611,14 @@ void object_is_not_filtered_out_based_on_last_modified_timestamp_when_delete_obj
611611 final ArgumentCaptor <ListObjectsV2Request > listObjectsV2RequestArgumentCaptor = ArgumentCaptor .forClass (ListObjectsV2Request .class );
612612 given (s3Client .listObjectsV2 (listObjectsV2RequestArgumentCaptor .capture ())).willReturn (listObjectsResponse );
613613
614+ final ArgumentCaptor <List <PartitionIdentifier >> createPartitionsArgumentCaptor = ArgumentCaptor .forClass (List .class );
615+ doNothing ().when (sourceCoordinator ).createPartitions (createPartitionsArgumentCaptor .capture ());
616+
614617 final Map <String , Object > globalStateMap = new HashMap <>();
615618
616619 final Instant beforeFirstScan = Instant .now ();
617620 final List <PartitionIdentifier > resultingPartitions = partitionCreationSupplier .apply (globalStateMap );
618-
619- assertThat (resultingPartitions , notNullValue ());
620- assertThat (resultingPartitions .size (), equalTo (expectedPartitionIdentifiers .size ()));
621- assertThat (resultingPartitions .stream ().map (PartitionIdentifier ::getPartitionKey ).collect (Collectors .toList ()),
622- containsInAnyOrder (expectedPartitionIdentifiers .stream ().map (PartitionIdentifier ::getPartitionKey )
623- .map (Matchers ::equalTo ).collect (Collectors .toList ())));
621+ assertThat (resultingPartitions .isEmpty (), equalTo (true ));
624622
625623 assertThat (globalStateMap , notNullValue ());
626624 assertThat (globalStateMap .containsKey (SCAN_COUNT ), equalTo (true ));
@@ -634,9 +632,33 @@ void object_is_not_filtered_out_based_on_last_modified_timestamp_when_delete_obj
634632
635633 final Instant beforeSecondScan = Instant .now ();
636634 final List <PartitionIdentifier > secondScanPartitions = partitionCreationSupplier .apply (globalStateMap );
637- assertThat (secondScanPartitions .size (), equalTo (expectedPartitionIdentifiersSecondScan .size ()));
638- assertThat (secondScanPartitions .stream ().map (PartitionIdentifier ::getPartitionKey ).collect (Collectors .toList ()),
639- containsInAnyOrder (expectedPartitionIdentifiersSecondScan .stream ().map (PartitionIdentifier ::getPartitionKey ).map (Matchers ::equalTo ).collect (Collectors .toList ())));
635+ assertThat (secondScanPartitions .isEmpty (), equalTo (true ));
636+ final List <List <PartitionIdentifier >> createdPartitions = createPartitionsArgumentCaptor .getAllValues ();
637+ assertThat (createdPartitions .size (), equalTo (4 ));
638+
639+ final List <PartitionIdentifier > firstScanFirstBucketPartitions = createdPartitions .get (0 );
640+ final List <PartitionIdentifier > firstScanSecondBucketPartitions = createdPartitions .get (1 );
641+
642+ final List <PartitionIdentifier > allFirstScanPartitions = new ArrayList <>();
643+ allFirstScanPartitions .addAll (firstScanFirstBucketPartitions );
644+ allFirstScanPartitions .addAll (firstScanSecondBucketPartitions );
645+
646+ assertThat (allFirstScanPartitions .size (), equalTo (expectedPartitionIdentifiers .size ()));
647+ assertThat (allFirstScanPartitions .stream ().map (PartitionIdentifier ::getPartitionKey ).collect (Collectors .toList ()),
648+ containsInAnyOrder (expectedPartitionIdentifiers .stream ().map (PartitionIdentifier ::getPartitionKey )
649+ .map (Matchers ::equalTo ).collect (Collectors .toList ())));
650+
651+ final List <PartitionIdentifier > secondScanFirstBucketPartitions = createdPartitions .get (2 );
652+ final List <PartitionIdentifier > secondScanSecondBucketPartitions = createdPartitions .get (3 );
653+
654+ final List <PartitionIdentifier > allSecondScanPartitions = new ArrayList <>();
655+ allSecondScanPartitions .addAll (secondScanFirstBucketPartitions );
656+ allSecondScanPartitions .addAll (secondScanSecondBucketPartitions );
657+
658+ assertThat (allSecondScanPartitions .size (), equalTo (expectedPartitionIdentifiersSecondScan .size ()));
659+ assertThat (allSecondScanPartitions .stream ().map (PartitionIdentifier ::getPartitionKey ).collect (Collectors .toList ()),
660+ containsInAnyOrder (expectedPartitionIdentifiersSecondScan .stream ().map (PartitionIdentifier ::getPartitionKey )
661+ .map (Matchers ::equalTo ).collect (Collectors .toList ())));
640662
641663 assertThat (globalStateMap , notNullValue ());
642664 assertThat (globalStateMap .containsKey (SCAN_COUNT ), equalTo (true ));
0 commit comments