diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/SourceCoordinator.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/SourceCoordinator.java index 28afef2067..c38e657bca 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/SourceCoordinator.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/SourceCoordinator.java @@ -63,6 +63,12 @@ public interface SourceCoordinator { */ Optional> getNextPartition(final Function, List> partitionCreationSupplier, final boolean forceSupplier); + /** + * Can be used to directly create partitions for source coordination, as an alternative to relying on getNextPartition to create partitions. + * @param partitionIdentifiers - The partitions to be created. + */ + void createPartitions(final List partitionIdentifiers); + /** * Should be called by the source when it has fully processed a given partition * @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionNotFoundException if the partition key could not be found in the distributed store diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/sourcecoordination/LeaseBasedSourceCoordinator.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/sourcecoordination/LeaseBasedSourceCoordinator.java index fe8784ddb3..ed68fe3c86 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/sourcecoordination/LeaseBasedSourceCoordinator.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/sourcecoordination/LeaseBasedSourceCoordinator.java @@ -60,6 +60,8 @@ public class LeaseBasedSourceCoordinator implements SourceCoordinator { static final String PARTITION_NOT_OWNED_ERROR_COUNT = "partitionNotOwnedErrors"; static final String PARTITION_UPDATE_ERROR_COUNT = "PartitionUpdateErrors"; static final Duration DEFAULT_LEASE_TIMEOUT = Duration.ofMinutes(10); + static final Duration DEFAULT_RENEW_TIMEOUT = Duration.ofMinutes(3); + static final Duration OWNERSHIP_RENEWAL_INTERVAL = Duration.ofMinutes(3); private static final String hostName; static final String PARTITION_TYPE = "PARTITION"; @@ -94,6 +96,7 @@ public class LeaseBasedSourceCoordinator implements SourceCoordinator { private final ReentrantLock lock; private Instant lastSupplierRunTime; + private Instant lastGlobalOwnershipRenewal; static final Duration FORCE_SUPPLIER_AFTER_DURATION = Duration.ofMinutes(5); @@ -136,6 +139,7 @@ public LeaseBasedSourceCoordinator(final Class partitionProgressStateClass, this.partitionsDeleted = pluginMetrics.counter(PARTITIONS_DELETED); this.lock = new ReentrantLock(); this.lastSupplierRunTime = Instant.now(); + this.lastGlobalOwnershipRenewal = Instant.now(); } @Override @@ -202,8 +206,13 @@ private Optional> getNextPartitionInternal(final Function partitionIdentifiers) { + @Override + public void createPartitions(final List partitionIdentifiers) { + renewGlobalOwnershipIfNeeded(); + for (final PartitionIdentifier partitionIdentifier : partitionIdentifiers) { + renewGlobalOwnershipIfNeeded(); + final Optional optionalPartitionItem = sourceCoordinationStore.getSourcePartitionItem(sourceIdentifierWithPartitionType, partitionIdentifier.getPartitionKey()); if (optionalPartitionItem.isPresent()) { @@ -514,6 +523,28 @@ private Optional acquireGlobalStateForPartitionCreatio return globalStateItemForPartitionCreation; } + private void renewGlobalStateOwnershipForPartitionCreation() { + try { + final Optional globalStateItem = sourceCoordinationStore.getSourcePartitionItem( + sourceIdentifierWithGlobalStateType, GLOBAL_STATE_SOURCE_PARTITION_KEY_FOR_CREATING_PARTITIONS); + + if (globalStateItem.isPresent() && ownerId.equals(globalStateItem.get().getPartitionOwner())) { + globalStateItem.get().setPartitionOwnershipTimeout(Instant.now().plus(DEFAULT_LEASE_TIMEOUT)); + sourceCoordinationStore.tryUpdateSourcePartitionItem(globalStateItem.get()); + } + } catch (final Exception e) { + LOG.warn("Failed to renew global state ownership for partition creation", e); + } + } + + private void renewGlobalOwnershipIfNeeded() { + if (Instant.now().isAfter(lastGlobalOwnershipRenewal.plus(OWNERSHIP_RENEWAL_INTERVAL))) { + LOG.info("Renewing global state ownership for partition creation"); + renewGlobalStateOwnershipForPartitionCreation(); + lastGlobalOwnershipRenewal = Instant.now(); + } + } + private void giveUpAndSaveGlobalStateForPartitionCreation(final SourcePartitionStoreItem globalStateItemForPartitionCreation, final Map globalStateMap) { globalStateItemForPartitionCreation.setSourcePartitionStatus(SourcePartitionStatus.UNASSIGNED); diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java index 98f59fde95..b95c737209 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.source.s3; import org.opensearch.dataprepper.model.source.coordinator.PartitionIdentifier; +import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; import org.opensearch.dataprepper.plugins.source.s3.configuration.FolderPartitioningOptions; import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanKeyPathOption; import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanSchedulingOptions; @@ -53,12 +54,15 @@ public class S3ScanPartitionCreationSupplier implements Function sourceCoordinator; + public S3ScanPartitionCreationSupplier(final S3Client s3Client, final BucketOwnerProvider bucketOwnerProvider, final List scanOptionsList, final S3ScanSchedulingOptions schedulingOptions, final FolderPartitioningOptions folderPartitioningOptions, - final boolean deleteS3ObjectsOnRead) { + final boolean deleteS3ObjectsOnRead, + final SourceCoordinator sourceCoordinator) { this.s3Client = s3Client; this.bucketOwnerProvider = bucketOwnerProvider; @@ -66,6 +70,7 @@ public S3ScanPartitionCreationSupplier(final S3Client s3Client, this.schedulingOptions = schedulingOptions; this.folderPartitioningOptions = folderPartitioningOptions; this.deleteS3ObjectsOnRead = deleteS3ObjectsOnRead; + this.sourceCoordinator = sourceCoordinator; } @Override @@ -99,12 +104,13 @@ public List apply(final Map globalStateMap) if (Objects.nonNull(s3ScanKeyPathOption) && Objects.nonNull(s3ScanKeyPathOption.getS3scanIncludePrefixOptions())) s3ScanKeyPathOption.getS3scanIncludePrefixOptions().forEach(includePath -> { listObjectsV2Request.prefix(includePath); - objectsToProcess.addAll(listFilteredS3ObjectsForBucket(excludeItems, listObjectsV2Request, - bucketName, scanOptions.getUseStartDateTime(), scanOptions.getUseEndDateTime(), globalStateMap)); + createFilteredS3ObjectPartitionsForBucket(excludeItems, listObjectsV2Request, + bucketName, scanOptions.getUseStartDateTime(), scanOptions.getUseEndDateTime(), globalStateMap); }); else - objectsToProcess.addAll(listFilteredS3ObjectsForBucket(excludeItems, listObjectsV2Request, - bucketName, scanOptions.getUseStartDateTime(), scanOptions.getUseEndDateTime(), globalStateMap)); + createFilteredS3ObjectPartitionsForBucket(excludeItems, listObjectsV2Request, + bucketName, scanOptions.getUseStartDateTime(), scanOptions.getUseEndDateTime(), globalStateMap); + if (!bucketScanTime.containsKey(bucketName)) { bucketScanTime.put(bucketName, updatedScanTime.toString()); } @@ -117,10 +123,11 @@ public List apply(final Map globalStateMap) globalStateMap.put(SCAN_COUNT, (Integer) globalStateMap.get(SCAN_COUNT) + 1); globalStateMap.put(LAST_SCAN_TIME, Instant.now().toEpochMilli()); - return objectsToProcess; + // Partitions are created by the supplier, so source coordinator does not need to attempt to create any partitions + return Collections.emptyList(); } - private List listFilteredS3ObjectsForBucket(final List excludeKeyPaths, + private void createFilteredS3ObjectPartitionsForBucket(final List excludeKeyPaths, final ListObjectsV2Request.Builder listObjectsV2Request, final String bucket, final LocalDateTime startDateTime, @@ -128,11 +135,11 @@ private List listFilteredS3ObjectsForBucket(final List globalStateMap) { final Instant previousScanTime = globalStateMap.get(bucket) != null ? Instant.parse((String) globalStateMap.get(bucket)) : null; final boolean isFirstScan = previousScanTime == null; - final List allPartitionIdentifiers = new ArrayList<>(); ListObjectsV2Response listObjectsV2Response = null; + do { listObjectsV2Response = s3Client.listObjectsV2(listObjectsV2Request.fetchOwner(true).continuationToken(Objects.nonNull(listObjectsV2Response) ? listObjectsV2Response.nextContinuationToken() : null).build()); - allPartitionIdentifiers.addAll(listObjectsV2Response.contents().stream() + final List partitionsForPage = listObjectsV2Response.contents().stream() .filter(s3Object -> deleteS3ObjectsOnRead || isLastModifiedTimeAfterMostRecentScanForBucket(previousScanTime, s3Object)) .map(s3Object -> Pair.of(s3Object.key(), instantToLocalDateTime(s3Object.lastModified()))) .filter(keyTimestampPair -> !keyTimestampPair.left().endsWith("/")) @@ -141,32 +148,17 @@ private List listFilteredS3ObjectsForBucket(final List isKeyMatchedBetweenTimeRange(keyTimestampPair.right(), startDateTime, endDateTime, isFirstScan)) .map(Pair::left) .map(objectKey -> PartitionIdentifier.builder().withPartitionKey(String.format(BUCKET_OBJECT_PARTITION_KEY_FORMAT, bucket, objectKey)).build()) - .collect(Collectors.toList())); - + .collect(Collectors.toList()); LOG.info("Found page of {} objects from bucket {}", listObjectsV2Response.keyCount(), bucket); - } while (listObjectsV2Response.isTruncated()); - - if (folderPartitioningOptions != null) { - final Set folderPartitions = allPartitionIdentifiers.stream() - .map(partitionIdentifier -> { - final String fullObjectKey = partitionIdentifier.getPartitionKey(); - final String prefix = getPrefixWithDepth(fullObjectKey); - if (prefix == null) { - return null; - } - return PartitionIdentifier.builder().withPartitionKey(prefix).build(); - }) - .filter(Objects::nonNull) - .collect(Collectors.toSet()); - - LOG.info("Running in folder_partitions mode at depth {}, found {} unique prefixes from {} objects", folderPartitioningOptions.getFolderDepth(), folderPartitions.size(), allPartitionIdentifiers.size()); - - return new ArrayList<>(folderPartitions); - } else { - LOG.info("Returning partitions for {} S3 objects from bucket {}", allPartitionIdentifiers.size(), bucket); - } - return allPartitionIdentifiers; + if (folderPartitioningOptions != null) { + final List folderPartitionsForPage = getFolderPartitionIdentifiers(partitionsForPage); + sourceCoordinator.createPartitions(folderPartitionsForPage); + } else { + LOG.info("Creating partitions for {} S3 objects from bucket {}", partitionsForPage.size(), bucket); + sourceCoordinator.createPartitions(partitionsForPage); + } + } while (listObjectsV2Response.isTruncated()); } private LocalDateTime instantToLocalDateTime(final Instant instant) { @@ -264,4 +256,23 @@ private String getPrefixWithDepth(final String fullObjectKey) { int actualDepth = min(folderPartitioningOptions.getFolderDepth(), folders.length - 1); return String.join("/", Arrays.copyOfRange(folders, 0, actualDepth)) + "/"; } + + private List getFolderPartitionIdentifiers(final List objectPartitionIdentifiers) { + + final Set folderPartitions = objectPartitionIdentifiers.stream() + .map(partitionIdentifier -> { + final String fullObjectKey = partitionIdentifier.getPartitionKey(); + final String prefix = getPrefixWithDepth(fullObjectKey); + if (prefix == null) { + return null; + } + return PartitionIdentifier.builder().withPartitionKey(prefix).build(); + }) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + + LOG.info("Running in folder_partitions mode at depth {}, found {} unique prefixes from {} objects", folderPartitioningOptions.getFolderDepth(), folderPartitions.size(), objectPartitionIdentifiers.size()); + + return new ArrayList<>(folderPartitions); + } } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java index eb807398f1..625e9e0aa6 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java @@ -156,7 +156,7 @@ public ScanObjectWorker(final S3Client s3Client, this.folderPartitioningOptions = s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions(); this.acknowledgmentSetTimeout = s3SourceConfig.getS3ScanScanOptions().getAcknowledgmentTimeout(); - this.partitionCreationSupplier = new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsBuilderList, s3ScanSchedulingOptions, s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions(), s3SourceConfig.isDeleteS3ObjectsOnRead()); + this.partitionCreationSupplier = new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsBuilderList, s3ScanSchedulingOptions, s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions(), s3SourceConfig.isDeleteS3ObjectsOnRead(), sourceCoordinator); this.acknowledgmentsRemainingForPartitions = new ConcurrentHashMap<>(); this.objectsToDeleteForAcknowledgmentSets = new ConcurrentHashMap<>(); } @@ -373,7 +373,6 @@ private boolean shouldDeleteFolderPartition(final SourcePartition objectsToProcess, final SourcePartition folderPartition) { int objectsProcessed = 0; diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java index 5214399d4b..5567d25437 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java @@ -13,6 +13,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.model.source.coordinator.PartitionIdentifier; +import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; import org.opensearch.dataprepper.plugins.source.s3.configuration.FolderPartitioningOptions; import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanBucketOption; import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanKeyPathOption; @@ -47,9 +48,11 @@ import static org.hamcrest.Matchers.notNullValue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import static org.opensearch.dataprepper.plugins.source.s3.S3ScanPartitionCreationSupplier.LAST_SCAN_TIME; import static org.opensearch.dataprepper.plugins.source.s3.S3ScanPartitionCreationSupplier.SCAN_COUNT; @@ -63,6 +66,9 @@ public class S3ScanPartitionCreationSupplierTest { @Mock private BucketOwnerProvider bucketOwnerProvider; + @Mock + private SourceCoordinator sourceCoordinator; + private List scanOptionsList; private S3ScanSchedulingOptions schedulingOptions; @@ -80,7 +86,7 @@ void setup() { private Function, List> createObjectUnderTest() { - return new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsList, schedulingOptions, folderPartitioningOptions, isDeleteS3ObjectsOnRead); + return new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsList, schedulingOptions, folderPartitioningOptions, isDeleteS3ObjectsOnRead, sourceCoordinator); } @Test @@ -120,7 +126,8 @@ void getNextPartition_supplier_without_scheduling_options_returns_expected_Parti final Function, List> partitionCreationSupplier = createObjectUnderTest(); - final List expectedPartitionIdentifiers = new ArrayList<>(); + final List expectedPartitionIdentifiersFirstBucket = new ArrayList<>(); + final List expectedPartitionIdentifiersSecondBucket = new ArrayList<>(); final ListObjectsV2Response listObjectsResponse = mock(ListObjectsV2Response.class); final List s3ObjectsList = new ArrayList<>(); @@ -134,7 +141,7 @@ void getNextPartition_supplier_without_scheduling_options_returns_expected_Parti given(invalidForFirstBucketSuffixObject.key()).willReturn("test.invalid"); given(invalidForFirstBucketSuffixObject.lastModified()).willReturn(Instant.now()); s3ObjectsList.add(invalidForFirstBucketSuffixObject); - expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + invalidForFirstBucketSuffixObject.key()).build()); + expectedPartitionIdentifiersSecondBucket.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + invalidForFirstBucketSuffixObject.key()).build()); final S3Object invalidDueToLastModifiedOutsideOfStartEndObject = mock(S3Object.class); given(invalidDueToLastModifiedOutsideOfStartEndObject.key()).willReturn(UUID.randomUUID().toString()); @@ -145,14 +152,18 @@ void getNextPartition_supplier_without_scheduling_options_returns_expected_Parti given(validObject.key()).willReturn("valid"); given(validObject.lastModified()).willReturn(Instant.now()); s3ObjectsList.add(validObject); - expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + validObject.key()).build()); - expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + validObject.key()).build()); + expectedPartitionIdentifiersFirstBucket.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + validObject.key()).build()); + expectedPartitionIdentifiersSecondBucket.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + validObject.key()).build()); given(listObjectsResponse.contents()).willReturn(s3ObjectsList); final ArgumentCaptor listObjectsV2RequestArgumentCaptor = ArgumentCaptor.forClass(ListObjectsV2Request.class); given(s3Client.listObjectsV2(listObjectsV2RequestArgumentCaptor.capture())).willReturn(listObjectsResponse); + final ArgumentCaptor> createPartitionsArgumentCaptor = ArgumentCaptor.forClass(List.class); + doNothing().when(sourceCoordinator).createPartitions(createPartitionsArgumentCaptor.capture()); + + final Map globalStateMap = new HashMap<>(); final List resultingPartitions = partitionCreationSupplier.apply(globalStateMap); @@ -162,12 +173,20 @@ void getNextPartition_supplier_without_scheduling_options_returns_expected_Parti globalStateMap.put(secondBucket, null); + final List> createdPartitions = createPartitionsArgumentCaptor.getAllValues(); + assertThat(createdPartitions.size(), equalTo(2)); + assertThat(createdPartitions.get(0).size(), equalTo(expectedPartitionIdentifiersFirstBucket.size())); + assertThat(createdPartitions.get(0).stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), + containsInAnyOrder(expectedPartitionIdentifiersFirstBucket.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList()))); + + assertThat(createdPartitions.get(1).size(), equalTo(expectedPartitionIdentifiersSecondBucket.size())); + assertThat(createdPartitions.get(1).stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), + containsInAnyOrder(expectedPartitionIdentifiersSecondBucket.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList()))); + assertThat(partitionCreationSupplier.apply(globalStateMap), equalTo(Collections.emptyList())); + assertThat(resultingPartitions.isEmpty(), equalTo(true)); - assertThat(resultingPartitions, notNullValue()); - assertThat(resultingPartitions.size(), equalTo(expectedPartitionIdentifiers.size())); - assertThat(resultingPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), - containsInAnyOrder(expectedPartitionIdentifiers.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList()))); + verifyNoMoreInteractions(sourceCoordinator); } @Test @@ -205,7 +224,8 @@ void getNextPartition_supplier_with_scheduling_options_returns_expected_Partitio final Function, List> partitionCreationSupplier = createObjectUnderTest(); - final List expectedPartitionIdentifiers = new ArrayList<>(); + final List expectedPartitionIdentifiersFirstBucket = new ArrayList<>(); + final List expectedPartitionIdentifiersSecondBucket = new ArrayList<>(); final ListObjectsV2Response listObjectsResponse = mock(ListObjectsV2Response.class); final List s3ObjectsList = new ArrayList<>(); @@ -219,26 +239,27 @@ void getNextPartition_supplier_with_scheduling_options_returns_expected_Partitio given(invalidForFirstBucketSuffixObject.key()).willReturn("test.invalid"); given(invalidForFirstBucketSuffixObject.lastModified()).willReturn(Instant.now().minusSeconds(2)); s3ObjectsList.add(invalidForFirstBucketSuffixObject); - expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + invalidForFirstBucketSuffixObject.key()).build()); + expectedPartitionIdentifiersSecondBucket.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + invalidForFirstBucketSuffixObject.key()).build()); final Instant mostRecentFirstScan = Instant.now().plusSeconds(2); final S3Object validObject = mock(S3Object.class); given(validObject.key()).willReturn("valid"); given(validObject.lastModified()).willReturn(mostRecentFirstScan); s3ObjectsList.add(validObject); - expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + validObject.key()).build()); - expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + validObject.key()).build()); + expectedPartitionIdentifiersFirstBucket.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + validObject.key()).build()); + expectedPartitionIdentifiersSecondBucket.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + validObject.key()).build()); final S3Object secondScanObject = mock(S3Object.class); final Instant mostRecentSecondScan = Instant.now().plusSeconds(10); given(secondScanObject.key()).willReturn("second-scan"); given(secondScanObject.lastModified()).willReturn(mostRecentSecondScan); - final List expectedPartitionIdentifiersSecondScan = new ArrayList<>(); - expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + secondScanObject.key()).build()); - expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + validObject.key()).build()); - expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + secondScanObject.key()).build()); - expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + validObject.key()).build()); + final List expectedPartitionIdentifiersSecondScanFirstBucket = new ArrayList<>(); + final List expectedPartitionIdentifiersSecondScanSecondBucket = new ArrayList<>(); + expectedPartitionIdentifiersSecondScanFirstBucket.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + secondScanObject.key()).build()); + expectedPartitionIdentifiersSecondScanFirstBucket.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + validObject.key()).build()); + expectedPartitionIdentifiersSecondScanSecondBucket.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + secondScanObject.key()).build()); + expectedPartitionIdentifiersSecondScanSecondBucket.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + validObject.key()).build()); final List secondScanObjects = new ArrayList<>(s3ObjectsList); secondScanObjects.add(secondScanObject); @@ -251,16 +272,15 @@ void getNextPartition_supplier_with_scheduling_options_returns_expected_Partitio final ArgumentCaptor listObjectsV2RequestArgumentCaptor = ArgumentCaptor.forClass(ListObjectsV2Request.class); given(s3Client.listObjectsV2(listObjectsV2RequestArgumentCaptor.capture())).willReturn(listObjectsResponse); + final ArgumentCaptor> createPartitionsArgumentCaptor = ArgumentCaptor.forClass(List.class); + doNothing().when(sourceCoordinator).createPartitions(createPartitionsArgumentCaptor.capture()); + final Map globalStateMap = new HashMap<>(); final Instant beforeFirstScan = Instant.now(); final List resultingPartitions = partitionCreationSupplier.apply(globalStateMap); - assertThat(resultingPartitions, notNullValue()); - assertThat(resultingPartitions.size(), equalTo(expectedPartitionIdentifiers.size())); - assertThat(resultingPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), - containsInAnyOrder(expectedPartitionIdentifiers.stream().map(PartitionIdentifier::getPartitionKey) - .map(Matchers::equalTo).collect(Collectors.toList()))); + assertThat(resultingPartitions.isEmpty(), equalTo(true)); assertThat(globalStateMap, notNullValue()); assertThat(globalStateMap.containsKey(SCAN_COUNT), equalTo(true)); @@ -274,9 +294,25 @@ void getNextPartition_supplier_with_scheduling_options_returns_expected_Partitio final Instant beforeSecondScan = Instant.now(); final List secondScanPartitions = partitionCreationSupplier.apply(globalStateMap); - assertThat(secondScanPartitions.size(), equalTo(expectedPartitionIdentifiersSecondScan.size())); - assertThat(secondScanPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), - containsInAnyOrder(expectedPartitionIdentifiersSecondScan.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList()))); + assertThat(secondScanPartitions.isEmpty(), equalTo(true)); + + final List> createdPartitions = createPartitionsArgumentCaptor.getAllValues(); + assertThat(createdPartitions.size(), equalTo(4)); + assertThat(createdPartitions.get(0).size(), equalTo(expectedPartitionIdentifiersFirstBucket.size())); + assertThat(createdPartitions.get(0).stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), + containsInAnyOrder(expectedPartitionIdentifiersFirstBucket.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList()))); + + assertThat(createdPartitions.get(1).size(), equalTo(expectedPartitionIdentifiersSecondBucket.size())); + assertThat(createdPartitions.get(1).stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), + containsInAnyOrder(expectedPartitionIdentifiersSecondBucket.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList()))); + + assertThat(createdPartitions.get(2).size(), equalTo(expectedPartitionIdentifiersSecondScanFirstBucket.size())); + assertThat(createdPartitions.get(2).stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), + containsInAnyOrder(expectedPartitionIdentifiersSecondScanFirstBucket.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList()))); + + assertThat(createdPartitions.get(3).size(), equalTo(expectedPartitionIdentifiersSecondScanSecondBucket.size())); + assertThat(createdPartitions.get(3).stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), + containsInAnyOrder(expectedPartitionIdentifiersSecondScanSecondBucket.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList()))); assertThat(globalStateMap, notNullValue()); assertThat(globalStateMap.containsKey(SCAN_COUNT), equalTo(true)); @@ -291,6 +327,8 @@ void getNextPartition_supplier_with_scheduling_options_returns_expected_Partitio assertThat(partitionCreationSupplier.apply(globalStateMap), equalTo(Collections.emptyList())); + verifyNoMoreInteractions(sourceCoordinator); + verify(listObjectsResponse, times(4)).contents(); } @@ -338,6 +376,9 @@ void scheduled_scan_filters_on_start_time_and_end_time_for_the_first_scan_and_fi given(validObject.lastModified()).willReturn(objectAfterStartAndEndTime); s3ObjectsList.add(validObject); + final ArgumentCaptor> createPartitionsArgumentCaptor = ArgumentCaptor.forClass(List.class); + doNothing().when(sourceCoordinator).createPartitions(createPartitionsArgumentCaptor.capture()); + final Instant objectBeforeStartTime = Instant.ofEpochMilli(startMillis).minus(500L, TimeUnit.SECONDS.toChronoUnit()); final S3Object invalidObject = mock(S3Object.class); given(invalidObject.key()).willReturn("invalid"); @@ -355,12 +396,20 @@ void scheduled_scan_filters_on_start_time_and_end_time_for_the_first_scan_and_fi final Function, List> partitionCreationSupplier = createObjectUnderTest(); final List firstScanPartitions = partitionCreationSupplier.apply(globalStateMap); - assertThat(firstScanPartitions.size(), equalTo(expectedPartitionIdentifiers.size())); - assertThat(firstScanPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), + assertThat(firstScanPartitions.isEmpty(), equalTo(true)); + + final List> createdPartitions = createPartitionsArgumentCaptor.getAllValues(); + assertThat(createdPartitions.size(), equalTo(2)); + assertThat(createdPartitions.get(0).isEmpty(), equalTo(true)); + + assertThat(createdPartitions.get(1).size(), equalTo(expectedPartitionIdentifiers.size())); + assertThat(createdPartitions.get(1).stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), containsInAnyOrder(expectedPartitionIdentifiers.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList()))); final List secondScanPartitions = partitionCreationSupplier.apply(globalStateMap); assertThat(secondScanPartitions.isEmpty(), equalTo(true)); + + verifyNoMoreInteractions(sourceCoordinator); } @Test @@ -403,7 +452,11 @@ void getNextPartition_with_folder_partitioning_enabled_returns_the_expected_part final Function, List> partitionCreationSupplier = createObjectUnderTest(); - final List expectedPartitionIdentifiers = new ArrayList<>(); + final ArgumentCaptor> createPartitionsArgumentCaptor = ArgumentCaptor.forClass(List.class); + doNothing().when(sourceCoordinator).createPartitions(createPartitionsArgumentCaptor.capture()); + + final List expectedPartitionIdentifiersFirstBucket = new ArrayList<>(); + final List expectedPartitionIdentifiersSecondBucket = new ArrayList<>(); final ListObjectsV2Response listObjectsResponse = mock(ListObjectsV2Response.class); final List s3ObjectsList = new ArrayList<>(); @@ -417,7 +470,7 @@ void getNextPartition_with_folder_partitioning_enabled_returns_the_expected_part given(invalidForFirstBucketSuffixObject.key()).willReturn("folder-1/test.invalid"); given(invalidForFirstBucketSuffixObject.lastModified()).willReturn(Instant.now()); s3ObjectsList.add(invalidForFirstBucketSuffixObject); - expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + "folder-1/").build()); + expectedPartitionIdentifiersSecondBucket.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + "folder-1/").build()); final S3Object invalidDueToLastModifiedOutsideOfStartEndObject = mock(S3Object.class); given(invalidDueToLastModifiedOutsideOfStartEndObject.key()).willReturn(UUID.randomUUID().toString()); @@ -428,14 +481,14 @@ void getNextPartition_with_folder_partitioning_enabled_returns_the_expected_part given(validObject.key()).willReturn("folder-1/valid"); given(validObject.lastModified()).willReturn(Instant.now()); s3ObjectsList.add(validObject); - expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + "folder-1/").build()); + expectedPartitionIdentifiersFirstBucket.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + "folder-1/").build()); final S3Object newFolderObject = mock(S3Object.class); given(newFolderObject.key()).willReturn("folder-2/valid"); given(newFolderObject.lastModified()).willReturn(Instant.now()); s3ObjectsList.add(newFolderObject); - expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + "folder-2/").build()); - expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + "folder-2/").build()); + expectedPartitionIdentifiersSecondBucket.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + "folder-2/").build()); + expectedPartitionIdentifiersFirstBucket.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + "folder-2/").build()); final S3Object noDepthFoundForFolder = mock(S3Object.class); given(noDepthFoundForFolder.key()).willReturn("no_folder.json"); @@ -449,6 +502,7 @@ void getNextPartition_with_folder_partitioning_enabled_returns_the_expected_part final Map globalStateMap = new HashMap<>(); final List resultingPartitions = partitionCreationSupplier.apply(globalStateMap); + assertThat(resultingPartitions.isEmpty(), equalTo(true)); assertThat(globalStateMap, notNullValue()); assertThat(globalStateMap.containsKey(SCAN_COUNT), equalTo(true)); @@ -458,10 +512,18 @@ void getNextPartition_with_folder_partitioning_enabled_returns_the_expected_part assertThat(partitionCreationSupplier.apply(globalStateMap), equalTo(Collections.emptyList())); - assertThat(resultingPartitions, notNullValue()); - assertThat(resultingPartitions.size(), equalTo(expectedPartitionIdentifiers.size())); - assertThat(resultingPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), - containsInAnyOrder(expectedPartitionIdentifiers.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList()))); + + final List> createdPartitions = createPartitionsArgumentCaptor.getAllValues(); + assertThat(createdPartitions.size(), equalTo(2)); + assertThat(createdPartitions.get(0).size(), equalTo(expectedPartitionIdentifiersFirstBucket.size())); + assertThat(createdPartitions.get(0).stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), + containsInAnyOrder(expectedPartitionIdentifiersFirstBucket.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList()))); + + assertThat(createdPartitions.get(1).size(), equalTo(expectedPartitionIdentifiersSecondBucket.size())); + assertThat(createdPartitions.get(1).stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), + containsInAnyOrder(expectedPartitionIdentifiersSecondBucket.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList()))); + + verifyNoMoreInteractions(sourceCoordinator); } @Test @@ -549,16 +611,14 @@ void object_is_not_filtered_out_based_on_last_modified_timestamp_when_delete_obj final ArgumentCaptor listObjectsV2RequestArgumentCaptor = ArgumentCaptor.forClass(ListObjectsV2Request.class); given(s3Client.listObjectsV2(listObjectsV2RequestArgumentCaptor.capture())).willReturn(listObjectsResponse); + final ArgumentCaptor> createPartitionsArgumentCaptor = ArgumentCaptor.forClass(List.class); + doNothing().when(sourceCoordinator).createPartitions(createPartitionsArgumentCaptor.capture()); + final Map globalStateMap = new HashMap<>(); final Instant beforeFirstScan = Instant.now(); final List resultingPartitions = partitionCreationSupplier.apply(globalStateMap); - - assertThat(resultingPartitions, notNullValue()); - assertThat(resultingPartitions.size(), equalTo(expectedPartitionIdentifiers.size())); - assertThat(resultingPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), - containsInAnyOrder(expectedPartitionIdentifiers.stream().map(PartitionIdentifier::getPartitionKey) - .map(Matchers::equalTo).collect(Collectors.toList()))); + assertThat(resultingPartitions.isEmpty(), equalTo(true)); assertThat(globalStateMap, notNullValue()); assertThat(globalStateMap.containsKey(SCAN_COUNT), equalTo(true)); @@ -572,9 +632,33 @@ void object_is_not_filtered_out_based_on_last_modified_timestamp_when_delete_obj final Instant beforeSecondScan = Instant.now(); final List secondScanPartitions = partitionCreationSupplier.apply(globalStateMap); - assertThat(secondScanPartitions.size(), equalTo(expectedPartitionIdentifiersSecondScan.size())); - assertThat(secondScanPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), - containsInAnyOrder(expectedPartitionIdentifiersSecondScan.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList()))); + assertThat(secondScanPartitions.isEmpty(), equalTo(true)); + final List> createdPartitions = createPartitionsArgumentCaptor.getAllValues(); + assertThat(createdPartitions.size(), equalTo(4)); + + final List firstScanFirstBucketPartitions = createdPartitions.get(0); + final List firstScanSecondBucketPartitions = createdPartitions.get(1); + + final List allFirstScanPartitions = new ArrayList<>(); + allFirstScanPartitions.addAll(firstScanFirstBucketPartitions); + allFirstScanPartitions.addAll(firstScanSecondBucketPartitions); + + assertThat(allFirstScanPartitions.size(), equalTo(expectedPartitionIdentifiers.size())); + assertThat(allFirstScanPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), + containsInAnyOrder(expectedPartitionIdentifiers.stream().map(PartitionIdentifier::getPartitionKey) + .map(Matchers::equalTo).collect(Collectors.toList()))); + + final List secondScanFirstBucketPartitions = createdPartitions.get(2); + final List secondScanSecondBucketPartitions = createdPartitions.get(3); + + final List allSecondScanPartitions = new ArrayList<>(); + allSecondScanPartitions.addAll(secondScanFirstBucketPartitions); + allSecondScanPartitions.addAll(secondScanSecondBucketPartitions); + + assertThat(allSecondScanPartitions.size(), equalTo(expectedPartitionIdentifiersSecondScan.size())); + assertThat(allSecondScanPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), + containsInAnyOrder(expectedPartitionIdentifiersSecondScan.stream().map(PartitionIdentifier::getPartitionKey) + .map(Matchers::equalTo).collect(Collectors.toList()))); assertThat(globalStateMap, notNullValue()); assertThat(globalStateMap.containsKey(SCAN_COUNT), equalTo(true));