From 107e59445680aacd3c128598f72dfa90bf36e169 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Wed, 9 Oct 2024 17:27:35 -0500 Subject: [PATCH 1/6] Add change to s3 scan source to create partitions after each page of objects listed Signed-off-by: Taylor Gray (cherry picked from commit bf74cb7677cd952a1db712797b56ed04d0e0ec2d) --- .../source/coordinator/SourceCoordinator.java | 6 + .../LeaseBasedSourceCoordinator.java | 3 +- .../s3/S3ScanPartitionCreationSupplier.java | 77 +++++----- .../plugins/source/s3/ScanObjectWorker.java | 3 +- .../S3ScanPartitionCreationSupplierTest.java | 136 +++++++++++++----- 5 files changed, 152 insertions(+), 73 deletions(-) diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/SourceCoordinator.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/SourceCoordinator.java index 28afef2067..c38e657bca 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/SourceCoordinator.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/SourceCoordinator.java @@ -63,6 +63,12 @@ public interface SourceCoordinator { */ Optional> getNextPartition(final Function, List> partitionCreationSupplier, final boolean forceSupplier); + /** + * Can be used to directly create partitions for source coordination, as an alternative to relying on getNextPartition to create partitions. + * @param partitionIdentifiers - The partitions to be created. + */ + void createPartitions(final List partitionIdentifiers); + /** * Should be called by the source when it has fully processed a given partition * @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionNotFoundException if the partition key could not be found in the distributed store diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/sourcecoordination/LeaseBasedSourceCoordinator.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/sourcecoordination/LeaseBasedSourceCoordinator.java index fe8784ddb3..7dfcc6746b 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/sourcecoordination/LeaseBasedSourceCoordinator.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/sourcecoordination/LeaseBasedSourceCoordinator.java @@ -202,7 +202,8 @@ private Optional> getNextPartitionInternal(final Function partitionIdentifiers) { + @Override + public void createPartitions(final List partitionIdentifiers) { for (final PartitionIdentifier partitionIdentifier : partitionIdentifiers) { final Optional optionalPartitionItem = sourceCoordinationStore.getSourcePartitionItem(sourceIdentifierWithPartitionType, partitionIdentifier.getPartitionKey()); diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java index 98f59fde95..b95c737209 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.source.s3; import org.opensearch.dataprepper.model.source.coordinator.PartitionIdentifier; +import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; import org.opensearch.dataprepper.plugins.source.s3.configuration.FolderPartitioningOptions; import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanKeyPathOption; import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanSchedulingOptions; @@ -53,12 +54,15 @@ public class S3ScanPartitionCreationSupplier implements Function sourceCoordinator; + public S3ScanPartitionCreationSupplier(final S3Client s3Client, final BucketOwnerProvider bucketOwnerProvider, final List scanOptionsList, final S3ScanSchedulingOptions schedulingOptions, final FolderPartitioningOptions folderPartitioningOptions, - final boolean deleteS3ObjectsOnRead) { + final boolean deleteS3ObjectsOnRead, + final SourceCoordinator sourceCoordinator) { this.s3Client = s3Client; this.bucketOwnerProvider = bucketOwnerProvider; @@ -66,6 +70,7 @@ public S3ScanPartitionCreationSupplier(final S3Client s3Client, this.schedulingOptions = schedulingOptions; this.folderPartitioningOptions = folderPartitioningOptions; this.deleteS3ObjectsOnRead = deleteS3ObjectsOnRead; + this.sourceCoordinator = sourceCoordinator; } @Override @@ -99,12 +104,13 @@ public List apply(final Map globalStateMap) if (Objects.nonNull(s3ScanKeyPathOption) && Objects.nonNull(s3ScanKeyPathOption.getS3scanIncludePrefixOptions())) s3ScanKeyPathOption.getS3scanIncludePrefixOptions().forEach(includePath -> { listObjectsV2Request.prefix(includePath); - objectsToProcess.addAll(listFilteredS3ObjectsForBucket(excludeItems, listObjectsV2Request, - bucketName, scanOptions.getUseStartDateTime(), scanOptions.getUseEndDateTime(), globalStateMap)); + createFilteredS3ObjectPartitionsForBucket(excludeItems, listObjectsV2Request, + bucketName, scanOptions.getUseStartDateTime(), scanOptions.getUseEndDateTime(), globalStateMap); }); else - objectsToProcess.addAll(listFilteredS3ObjectsForBucket(excludeItems, listObjectsV2Request, - bucketName, scanOptions.getUseStartDateTime(), scanOptions.getUseEndDateTime(), globalStateMap)); + createFilteredS3ObjectPartitionsForBucket(excludeItems, listObjectsV2Request, + bucketName, scanOptions.getUseStartDateTime(), scanOptions.getUseEndDateTime(), globalStateMap); + if (!bucketScanTime.containsKey(bucketName)) { bucketScanTime.put(bucketName, updatedScanTime.toString()); } @@ -117,10 +123,11 @@ public List apply(final Map globalStateMap) globalStateMap.put(SCAN_COUNT, (Integer) globalStateMap.get(SCAN_COUNT) + 1); globalStateMap.put(LAST_SCAN_TIME, Instant.now().toEpochMilli()); - return objectsToProcess; + // Partitions are created by the supplier, so source coordinator does not need to attempt to create any partitions + return Collections.emptyList(); } - private List listFilteredS3ObjectsForBucket(final List excludeKeyPaths, + private void createFilteredS3ObjectPartitionsForBucket(final List excludeKeyPaths, final ListObjectsV2Request.Builder listObjectsV2Request, final String bucket, final LocalDateTime startDateTime, @@ -128,11 +135,11 @@ private List listFilteredS3ObjectsForBucket(final List globalStateMap) { final Instant previousScanTime = globalStateMap.get(bucket) != null ? Instant.parse((String) globalStateMap.get(bucket)) : null; final boolean isFirstScan = previousScanTime == null; - final List allPartitionIdentifiers = new ArrayList<>(); ListObjectsV2Response listObjectsV2Response = null; + do { listObjectsV2Response = s3Client.listObjectsV2(listObjectsV2Request.fetchOwner(true).continuationToken(Objects.nonNull(listObjectsV2Response) ? listObjectsV2Response.nextContinuationToken() : null).build()); - allPartitionIdentifiers.addAll(listObjectsV2Response.contents().stream() + final List partitionsForPage = listObjectsV2Response.contents().stream() .filter(s3Object -> deleteS3ObjectsOnRead || isLastModifiedTimeAfterMostRecentScanForBucket(previousScanTime, s3Object)) .map(s3Object -> Pair.of(s3Object.key(), instantToLocalDateTime(s3Object.lastModified()))) .filter(keyTimestampPair -> !keyTimestampPair.left().endsWith("/")) @@ -141,32 +148,17 @@ private List listFilteredS3ObjectsForBucket(final List isKeyMatchedBetweenTimeRange(keyTimestampPair.right(), startDateTime, endDateTime, isFirstScan)) .map(Pair::left) .map(objectKey -> PartitionIdentifier.builder().withPartitionKey(String.format(BUCKET_OBJECT_PARTITION_KEY_FORMAT, bucket, objectKey)).build()) - .collect(Collectors.toList())); - + .collect(Collectors.toList()); LOG.info("Found page of {} objects from bucket {}", listObjectsV2Response.keyCount(), bucket); - } while (listObjectsV2Response.isTruncated()); - - if (folderPartitioningOptions != null) { - final Set folderPartitions = allPartitionIdentifiers.stream() - .map(partitionIdentifier -> { - final String fullObjectKey = partitionIdentifier.getPartitionKey(); - final String prefix = getPrefixWithDepth(fullObjectKey); - if (prefix == null) { - return null; - } - return PartitionIdentifier.builder().withPartitionKey(prefix).build(); - }) - .filter(Objects::nonNull) - .collect(Collectors.toSet()); - - LOG.info("Running in folder_partitions mode at depth {}, found {} unique prefixes from {} objects", folderPartitioningOptions.getFolderDepth(), folderPartitions.size(), allPartitionIdentifiers.size()); - - return new ArrayList<>(folderPartitions); - } else { - LOG.info("Returning partitions for {} S3 objects from bucket {}", allPartitionIdentifiers.size(), bucket); - } - return allPartitionIdentifiers; + if (folderPartitioningOptions != null) { + final List folderPartitionsForPage = getFolderPartitionIdentifiers(partitionsForPage); + sourceCoordinator.createPartitions(folderPartitionsForPage); + } else { + LOG.info("Creating partitions for {} S3 objects from bucket {}", partitionsForPage.size(), bucket); + sourceCoordinator.createPartitions(partitionsForPage); + } + } while (listObjectsV2Response.isTruncated()); } private LocalDateTime instantToLocalDateTime(final Instant instant) { @@ -264,4 +256,23 @@ private String getPrefixWithDepth(final String fullObjectKey) { int actualDepth = min(folderPartitioningOptions.getFolderDepth(), folders.length - 1); return String.join("/", Arrays.copyOfRange(folders, 0, actualDepth)) + "/"; } + + private List getFolderPartitionIdentifiers(final List objectPartitionIdentifiers) { + + final Set folderPartitions = objectPartitionIdentifiers.stream() + .map(partitionIdentifier -> { + final String fullObjectKey = partitionIdentifier.getPartitionKey(); + final String prefix = getPrefixWithDepth(fullObjectKey); + if (prefix == null) { + return null; + } + return PartitionIdentifier.builder().withPartitionKey(prefix).build(); + }) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + + LOG.info("Running in folder_partitions mode at depth {}, found {} unique prefixes from {} objects", folderPartitioningOptions.getFolderDepth(), folderPartitions.size(), objectPartitionIdentifiers.size()); + + return new ArrayList<>(folderPartitions); + } } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java index eb807398f1..625e9e0aa6 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java @@ -156,7 +156,7 @@ public ScanObjectWorker(final S3Client s3Client, this.folderPartitioningOptions = s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions(); this.acknowledgmentSetTimeout = s3SourceConfig.getS3ScanScanOptions().getAcknowledgmentTimeout(); - this.partitionCreationSupplier = new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsBuilderList, s3ScanSchedulingOptions, s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions(), s3SourceConfig.isDeleteS3ObjectsOnRead()); + this.partitionCreationSupplier = new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsBuilderList, s3ScanSchedulingOptions, s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions(), s3SourceConfig.isDeleteS3ObjectsOnRead(), sourceCoordinator); this.acknowledgmentsRemainingForPartitions = new ConcurrentHashMap<>(); this.objectsToDeleteForAcknowledgmentSets = new ConcurrentHashMap<>(); } @@ -373,7 +373,6 @@ private boolean shouldDeleteFolderPartition(final SourcePartition objectsToProcess, final SourcePartition folderPartition) { int objectsProcessed = 0; diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java index 5214399d4b..0a66443698 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java @@ -13,6 +13,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.model.source.coordinator.PartitionIdentifier; +import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; import org.opensearch.dataprepper.plugins.source.s3.configuration.FolderPartitioningOptions; import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanBucketOption; import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanKeyPathOption; @@ -47,9 +48,11 @@ import static org.hamcrest.Matchers.notNullValue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import static org.opensearch.dataprepper.plugins.source.s3.S3ScanPartitionCreationSupplier.LAST_SCAN_TIME; import static org.opensearch.dataprepper.plugins.source.s3.S3ScanPartitionCreationSupplier.SCAN_COUNT; @@ -63,6 +66,9 @@ public class S3ScanPartitionCreationSupplierTest { @Mock private BucketOwnerProvider bucketOwnerProvider; + @Mock + private SourceCoordinator sourceCoordinator; + private List scanOptionsList; private S3ScanSchedulingOptions schedulingOptions; @@ -80,7 +86,7 @@ void setup() { private Function, List> createObjectUnderTest() { - return new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsList, schedulingOptions, folderPartitioningOptions, isDeleteS3ObjectsOnRead); + return new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsList, schedulingOptions, folderPartitioningOptions, isDeleteS3ObjectsOnRead, sourceCoordinator); } @Test @@ -120,7 +126,8 @@ void getNextPartition_supplier_without_scheduling_options_returns_expected_Parti final Function, List> partitionCreationSupplier = createObjectUnderTest(); - final List expectedPartitionIdentifiers = new ArrayList<>(); + final List expectedPartitionIdentifiersFirstBucket = new ArrayList<>(); + final List expectedPartitionIdentifiersSecondBucket = new ArrayList<>(); final ListObjectsV2Response listObjectsResponse = mock(ListObjectsV2Response.class); final List s3ObjectsList = new ArrayList<>(); @@ -134,7 +141,7 @@ void getNextPartition_supplier_without_scheduling_options_returns_expected_Parti given(invalidForFirstBucketSuffixObject.key()).willReturn("test.invalid"); given(invalidForFirstBucketSuffixObject.lastModified()).willReturn(Instant.now()); s3ObjectsList.add(invalidForFirstBucketSuffixObject); - expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + invalidForFirstBucketSuffixObject.key()).build()); + expectedPartitionIdentifiersSecondBucket.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + invalidForFirstBucketSuffixObject.key()).build()); final S3Object invalidDueToLastModifiedOutsideOfStartEndObject = mock(S3Object.class); given(invalidDueToLastModifiedOutsideOfStartEndObject.key()).willReturn(UUID.randomUUID().toString()); @@ -145,14 +152,18 @@ void getNextPartition_supplier_without_scheduling_options_returns_expected_Parti given(validObject.key()).willReturn("valid"); given(validObject.lastModified()).willReturn(Instant.now()); s3ObjectsList.add(validObject); - expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + validObject.key()).build()); - expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + validObject.key()).build()); + expectedPartitionIdentifiersFirstBucket.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + validObject.key()).build()); + expectedPartitionIdentifiersSecondBucket.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + validObject.key()).build()); given(listObjectsResponse.contents()).willReturn(s3ObjectsList); final ArgumentCaptor listObjectsV2RequestArgumentCaptor = ArgumentCaptor.forClass(ListObjectsV2Request.class); given(s3Client.listObjectsV2(listObjectsV2RequestArgumentCaptor.capture())).willReturn(listObjectsResponse); + final ArgumentCaptor> createPartitionsArgumentCaptor = ArgumentCaptor.forClass(List.class); + doNothing().when(sourceCoordinator).createPartitions(createPartitionsArgumentCaptor.capture()); + + final Map globalStateMap = new HashMap<>(); final List resultingPartitions = partitionCreationSupplier.apply(globalStateMap); @@ -162,12 +173,20 @@ void getNextPartition_supplier_without_scheduling_options_returns_expected_Parti globalStateMap.put(secondBucket, null); + final List> createdPartitions = createPartitionsArgumentCaptor.getAllValues(); + assertThat(createdPartitions.size(), equalTo(2)); + assertThat(createdPartitions.get(0).size(), equalTo(expectedPartitionIdentifiersFirstBucket.size())); + assertThat(createdPartitions.get(0).stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), + containsInAnyOrder(expectedPartitionIdentifiersFirstBucket.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList()))); + + assertThat(createdPartitions.get(1).size(), equalTo(expectedPartitionIdentifiersSecondBucket.size())); + assertThat(createdPartitions.get(1).stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), + containsInAnyOrder(expectedPartitionIdentifiersSecondBucket.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList()))); + assertThat(partitionCreationSupplier.apply(globalStateMap), equalTo(Collections.emptyList())); + assertThat(resultingPartitions.isEmpty(), equalTo(true)); - assertThat(resultingPartitions, notNullValue()); - assertThat(resultingPartitions.size(), equalTo(expectedPartitionIdentifiers.size())); - assertThat(resultingPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), - containsInAnyOrder(expectedPartitionIdentifiers.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList()))); + verifyNoMoreInteractions(sourceCoordinator); } @Test @@ -205,7 +224,8 @@ void getNextPartition_supplier_with_scheduling_options_returns_expected_Partitio final Function, List> partitionCreationSupplier = createObjectUnderTest(); - final List expectedPartitionIdentifiers = new ArrayList<>(); + final List expectedPartitionIdentifiersFirstBucket = new ArrayList<>(); + final List expectedPartitionIdentifiersSecondBucket = new ArrayList<>(); final ListObjectsV2Response listObjectsResponse = mock(ListObjectsV2Response.class); final List s3ObjectsList = new ArrayList<>(); @@ -219,26 +239,27 @@ void getNextPartition_supplier_with_scheduling_options_returns_expected_Partitio given(invalidForFirstBucketSuffixObject.key()).willReturn("test.invalid"); given(invalidForFirstBucketSuffixObject.lastModified()).willReturn(Instant.now().minusSeconds(2)); s3ObjectsList.add(invalidForFirstBucketSuffixObject); - expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + invalidForFirstBucketSuffixObject.key()).build()); + expectedPartitionIdentifiersSecondBucket.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + invalidForFirstBucketSuffixObject.key()).build()); final Instant mostRecentFirstScan = Instant.now().plusSeconds(2); final S3Object validObject = mock(S3Object.class); given(validObject.key()).willReturn("valid"); given(validObject.lastModified()).willReturn(mostRecentFirstScan); s3ObjectsList.add(validObject); - expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + validObject.key()).build()); - expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + validObject.key()).build()); + expectedPartitionIdentifiersFirstBucket.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + validObject.key()).build()); + expectedPartitionIdentifiersSecondBucket.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + validObject.key()).build()); final S3Object secondScanObject = mock(S3Object.class); final Instant mostRecentSecondScan = Instant.now().plusSeconds(10); given(secondScanObject.key()).willReturn("second-scan"); given(secondScanObject.lastModified()).willReturn(mostRecentSecondScan); - final List expectedPartitionIdentifiersSecondScan = new ArrayList<>(); - expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + secondScanObject.key()).build()); - expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + validObject.key()).build()); - expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + secondScanObject.key()).build()); - expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + validObject.key()).build()); + final List expectedPartitionIdentifiersSecondScanFirstBucket = new ArrayList<>(); + final List expectedPartitionIdentifiersSecondScanSecondBucket = new ArrayList<>(); + expectedPartitionIdentifiersSecondScanFirstBucket.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + secondScanObject.key()).build()); + expectedPartitionIdentifiersSecondScanFirstBucket.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + validObject.key()).build()); + expectedPartitionIdentifiersSecondScanSecondBucket.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + secondScanObject.key()).build()); + expectedPartitionIdentifiersSecondScanSecondBucket.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + validObject.key()).build()); final List secondScanObjects = new ArrayList<>(s3ObjectsList); secondScanObjects.add(secondScanObject); @@ -251,16 +272,15 @@ void getNextPartition_supplier_with_scheduling_options_returns_expected_Partitio final ArgumentCaptor listObjectsV2RequestArgumentCaptor = ArgumentCaptor.forClass(ListObjectsV2Request.class); given(s3Client.listObjectsV2(listObjectsV2RequestArgumentCaptor.capture())).willReturn(listObjectsResponse); + final ArgumentCaptor> createPartitionsArgumentCaptor = ArgumentCaptor.forClass(List.class); + doNothing().when(sourceCoordinator).createPartitions(createPartitionsArgumentCaptor.capture()); + final Map globalStateMap = new HashMap<>(); final Instant beforeFirstScan = Instant.now(); final List resultingPartitions = partitionCreationSupplier.apply(globalStateMap); - assertThat(resultingPartitions, notNullValue()); - assertThat(resultingPartitions.size(), equalTo(expectedPartitionIdentifiers.size())); - assertThat(resultingPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), - containsInAnyOrder(expectedPartitionIdentifiers.stream().map(PartitionIdentifier::getPartitionKey) - .map(Matchers::equalTo).collect(Collectors.toList()))); + assertThat(resultingPartitions.isEmpty(), equalTo(true)); assertThat(globalStateMap, notNullValue()); assertThat(globalStateMap.containsKey(SCAN_COUNT), equalTo(true)); @@ -274,9 +294,25 @@ void getNextPartition_supplier_with_scheduling_options_returns_expected_Partitio final Instant beforeSecondScan = Instant.now(); final List secondScanPartitions = partitionCreationSupplier.apply(globalStateMap); - assertThat(secondScanPartitions.size(), equalTo(expectedPartitionIdentifiersSecondScan.size())); - assertThat(secondScanPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), - containsInAnyOrder(expectedPartitionIdentifiersSecondScan.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList()))); + assertThat(secondScanPartitions.isEmpty(), equalTo(true)); + + final List> createdPartitions = createPartitionsArgumentCaptor.getAllValues(); + assertThat(createdPartitions.size(), equalTo(4)); + assertThat(createdPartitions.get(0).size(), equalTo(expectedPartitionIdentifiersFirstBucket.size())); + assertThat(createdPartitions.get(0).stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), + containsInAnyOrder(expectedPartitionIdentifiersFirstBucket.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList()))); + + assertThat(createdPartitions.get(1).size(), equalTo(expectedPartitionIdentifiersSecondBucket.size())); + assertThat(createdPartitions.get(1).stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), + containsInAnyOrder(expectedPartitionIdentifiersSecondBucket.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList()))); + + assertThat(createdPartitions.get(2).size(), equalTo(expectedPartitionIdentifiersSecondScanFirstBucket.size())); + assertThat(createdPartitions.get(2).stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), + containsInAnyOrder(expectedPartitionIdentifiersSecondScanFirstBucket.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList()))); + + assertThat(createdPartitions.get(3).size(), equalTo(expectedPartitionIdentifiersSecondScanSecondBucket.size())); + assertThat(createdPartitions.get(3).stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), + containsInAnyOrder(expectedPartitionIdentifiersSecondScanSecondBucket.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList()))); assertThat(globalStateMap, notNullValue()); assertThat(globalStateMap.containsKey(SCAN_COUNT), equalTo(true)); @@ -291,6 +327,8 @@ void getNextPartition_supplier_with_scheduling_options_returns_expected_Partitio assertThat(partitionCreationSupplier.apply(globalStateMap), equalTo(Collections.emptyList())); + verifyNoMoreInteractions(sourceCoordinator); + verify(listObjectsResponse, times(4)).contents(); } @@ -338,6 +376,9 @@ void scheduled_scan_filters_on_start_time_and_end_time_for_the_first_scan_and_fi given(validObject.lastModified()).willReturn(objectAfterStartAndEndTime); s3ObjectsList.add(validObject); + final ArgumentCaptor> createPartitionsArgumentCaptor = ArgumentCaptor.forClass(List.class); + doNothing().when(sourceCoordinator).createPartitions(createPartitionsArgumentCaptor.capture()); + final Instant objectBeforeStartTime = Instant.ofEpochMilli(startMillis).minus(500L, TimeUnit.SECONDS.toChronoUnit()); final S3Object invalidObject = mock(S3Object.class); given(invalidObject.key()).willReturn("invalid"); @@ -355,12 +396,20 @@ void scheduled_scan_filters_on_start_time_and_end_time_for_the_first_scan_and_fi final Function, List> partitionCreationSupplier = createObjectUnderTest(); final List firstScanPartitions = partitionCreationSupplier.apply(globalStateMap); - assertThat(firstScanPartitions.size(), equalTo(expectedPartitionIdentifiers.size())); - assertThat(firstScanPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), + assertThat(firstScanPartitions.isEmpty(), equalTo(true)); + + final List> createdPartitions = createPartitionsArgumentCaptor.getAllValues(); + assertThat(createdPartitions.size(), equalTo(2)); + assertThat(createdPartitions.get(0).isEmpty(), equalTo(true)); + + assertThat(createdPartitions.get(1).size(), equalTo(expectedPartitionIdentifiers.size())); + assertThat(createdPartitions.get(1).stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), containsInAnyOrder(expectedPartitionIdentifiers.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList()))); final List secondScanPartitions = partitionCreationSupplier.apply(globalStateMap); assertThat(secondScanPartitions.isEmpty(), equalTo(true)); + + verifyNoMoreInteractions(sourceCoordinator); } @Test @@ -403,7 +452,11 @@ void getNextPartition_with_folder_partitioning_enabled_returns_the_expected_part final Function, List> partitionCreationSupplier = createObjectUnderTest(); - final List expectedPartitionIdentifiers = new ArrayList<>(); + final ArgumentCaptor> createPartitionsArgumentCaptor = ArgumentCaptor.forClass(List.class); + doNothing().when(sourceCoordinator).createPartitions(createPartitionsArgumentCaptor.capture()); + + final List expectedPartitionIdentifiersFirstBucket = new ArrayList<>(); + final List expectedPartitionIdentifiersSecondBucket = new ArrayList<>(); final ListObjectsV2Response listObjectsResponse = mock(ListObjectsV2Response.class); final List s3ObjectsList = new ArrayList<>(); @@ -417,7 +470,7 @@ void getNextPartition_with_folder_partitioning_enabled_returns_the_expected_part given(invalidForFirstBucketSuffixObject.key()).willReturn("folder-1/test.invalid"); given(invalidForFirstBucketSuffixObject.lastModified()).willReturn(Instant.now()); s3ObjectsList.add(invalidForFirstBucketSuffixObject); - expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + "folder-1/").build()); + expectedPartitionIdentifiersSecondBucket.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + "folder-1/").build()); final S3Object invalidDueToLastModifiedOutsideOfStartEndObject = mock(S3Object.class); given(invalidDueToLastModifiedOutsideOfStartEndObject.key()).willReturn(UUID.randomUUID().toString()); @@ -428,14 +481,14 @@ void getNextPartition_with_folder_partitioning_enabled_returns_the_expected_part given(validObject.key()).willReturn("folder-1/valid"); given(validObject.lastModified()).willReturn(Instant.now()); s3ObjectsList.add(validObject); - expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + "folder-1/").build()); + expectedPartitionIdentifiersFirstBucket.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + "folder-1/").build()); final S3Object newFolderObject = mock(S3Object.class); given(newFolderObject.key()).willReturn("folder-2/valid"); given(newFolderObject.lastModified()).willReturn(Instant.now()); s3ObjectsList.add(newFolderObject); - expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + "folder-2/").build()); - expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + "folder-2/").build()); + expectedPartitionIdentifiersSecondBucket.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + "folder-2/").build()); + expectedPartitionIdentifiersFirstBucket.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + "folder-2/").build()); final S3Object noDepthFoundForFolder = mock(S3Object.class); given(noDepthFoundForFolder.key()).willReturn("no_folder.json"); @@ -449,6 +502,7 @@ void getNextPartition_with_folder_partitioning_enabled_returns_the_expected_part final Map globalStateMap = new HashMap<>(); final List resultingPartitions = partitionCreationSupplier.apply(globalStateMap); + assertThat(resultingPartitions.isEmpty(), equalTo(true)); assertThat(globalStateMap, notNullValue()); assertThat(globalStateMap.containsKey(SCAN_COUNT), equalTo(true)); @@ -458,10 +512,18 @@ void getNextPartition_with_folder_partitioning_enabled_returns_the_expected_part assertThat(partitionCreationSupplier.apply(globalStateMap), equalTo(Collections.emptyList())); - assertThat(resultingPartitions, notNullValue()); - assertThat(resultingPartitions.size(), equalTo(expectedPartitionIdentifiers.size())); - assertThat(resultingPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), - containsInAnyOrder(expectedPartitionIdentifiers.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList()))); + + final List> createdPartitions = createPartitionsArgumentCaptor.getAllValues(); + assertThat(createdPartitions.size(), equalTo(2)); + assertThat(createdPartitions.get(0).size(), equalTo(expectedPartitionIdentifiersFirstBucket.size())); + assertThat(createdPartitions.get(0).stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), + containsInAnyOrder(expectedPartitionIdentifiersFirstBucket.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList()))); + + assertThat(createdPartitions.get(1).size(), equalTo(expectedPartitionIdentifiersSecondBucket.size())); + assertThat(createdPartitions.get(1).stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), + containsInAnyOrder(expectedPartitionIdentifiersSecondBucket.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList()))); + + verifyNoMoreInteractions(sourceCoordinator); } @Test From 359b7d0212f227863104254d2153d9503f476120 Mon Sep 17 00:00:00 2001 From: Siqi Ding Date: Fri, 22 Aug 2025 10:31:31 -0500 Subject: [PATCH 2/6] Renew the unit test to handle the two scans and seperate buckets Signed-off-by: Siqi Ding --- .../S3ScanPartitionCreationSupplierTest.java | 40 ++++++++++++++----- 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java index 0a66443698..5567d25437 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java @@ -611,16 +611,14 @@ void object_is_not_filtered_out_based_on_last_modified_timestamp_when_delete_obj final ArgumentCaptor listObjectsV2RequestArgumentCaptor = ArgumentCaptor.forClass(ListObjectsV2Request.class); given(s3Client.listObjectsV2(listObjectsV2RequestArgumentCaptor.capture())).willReturn(listObjectsResponse); + final ArgumentCaptor> createPartitionsArgumentCaptor = ArgumentCaptor.forClass(List.class); + doNothing().when(sourceCoordinator).createPartitions(createPartitionsArgumentCaptor.capture()); + final Map globalStateMap = new HashMap<>(); final Instant beforeFirstScan = Instant.now(); final List resultingPartitions = partitionCreationSupplier.apply(globalStateMap); - - assertThat(resultingPartitions, notNullValue()); - assertThat(resultingPartitions.size(), equalTo(expectedPartitionIdentifiers.size())); - assertThat(resultingPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), - containsInAnyOrder(expectedPartitionIdentifiers.stream().map(PartitionIdentifier::getPartitionKey) - .map(Matchers::equalTo).collect(Collectors.toList()))); + assertThat(resultingPartitions.isEmpty(), equalTo(true)); assertThat(globalStateMap, notNullValue()); assertThat(globalStateMap.containsKey(SCAN_COUNT), equalTo(true)); @@ -634,9 +632,33 @@ void object_is_not_filtered_out_based_on_last_modified_timestamp_when_delete_obj final Instant beforeSecondScan = Instant.now(); final List secondScanPartitions = partitionCreationSupplier.apply(globalStateMap); - assertThat(secondScanPartitions.size(), equalTo(expectedPartitionIdentifiersSecondScan.size())); - assertThat(secondScanPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), - containsInAnyOrder(expectedPartitionIdentifiersSecondScan.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList()))); + assertThat(secondScanPartitions.isEmpty(), equalTo(true)); + final List> createdPartitions = createPartitionsArgumentCaptor.getAllValues(); + assertThat(createdPartitions.size(), equalTo(4)); + + final List firstScanFirstBucketPartitions = createdPartitions.get(0); + final List firstScanSecondBucketPartitions = createdPartitions.get(1); + + final List allFirstScanPartitions = new ArrayList<>(); + allFirstScanPartitions.addAll(firstScanFirstBucketPartitions); + allFirstScanPartitions.addAll(firstScanSecondBucketPartitions); + + assertThat(allFirstScanPartitions.size(), equalTo(expectedPartitionIdentifiers.size())); + assertThat(allFirstScanPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), + containsInAnyOrder(expectedPartitionIdentifiers.stream().map(PartitionIdentifier::getPartitionKey) + .map(Matchers::equalTo).collect(Collectors.toList()))); + + final List secondScanFirstBucketPartitions = createdPartitions.get(2); + final List secondScanSecondBucketPartitions = createdPartitions.get(3); + + final List allSecondScanPartitions = new ArrayList<>(); + allSecondScanPartitions.addAll(secondScanFirstBucketPartitions); + allSecondScanPartitions.addAll(secondScanSecondBucketPartitions); + + assertThat(allSecondScanPartitions.size(), equalTo(expectedPartitionIdentifiersSecondScan.size())); + assertThat(allSecondScanPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), + containsInAnyOrder(expectedPartitionIdentifiersSecondScan.stream().map(PartitionIdentifier::getPartitionKey) + .map(Matchers::equalTo).collect(Collectors.toList()))); assertThat(globalStateMap, notNullValue()); assertThat(globalStateMap.containsKey(SCAN_COUNT), equalTo(true)); From 550fc20cb19ee34e60b3632e668b0d411ad334e2 Mon Sep 17 00:00:00 2001 From: Siqi Ding Date: Tue, 2 Sep 2025 15:07:29 -0500 Subject: [PATCH 3/6] Add global state ownership renewal during partition creation Signed-off-by: Siqi Ding --- .../LeaseBasedSourceCoordinator.java | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/sourcecoordination/LeaseBasedSourceCoordinator.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/sourcecoordination/LeaseBasedSourceCoordinator.java index 7dfcc6746b..d838bef92c 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/sourcecoordination/LeaseBasedSourceCoordinator.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/sourcecoordination/LeaseBasedSourceCoordinator.java @@ -60,6 +60,7 @@ public class LeaseBasedSourceCoordinator implements SourceCoordinator { static final String PARTITION_NOT_OWNED_ERROR_COUNT = "partitionNotOwnedErrors"; static final String PARTITION_UPDATE_ERROR_COUNT = "PartitionUpdateErrors"; static final Duration DEFAULT_LEASE_TIMEOUT = Duration.ofMinutes(10); + static final Duration DEFAULT_RENEW_TIMEOUT = Duration.ofMinutes(3); private static final String hostName; static final String PARTITION_TYPE = "PARTITION"; @@ -204,7 +205,17 @@ private Optional> getNextPartitionInternal(final Function partitionIdentifiers) { + Instant lastOwnershipRenewal = Instant.now(); + for (final PartitionIdentifier partitionIdentifier : partitionIdentifiers) { + if (Instant.now().isAfter(lastOwnershipRenewal.plus(DEFAULT_RENEW_TIMEOUT))) { + LOG.info("Renewing global state ownership for partition creation"); + renewGlobalStateOwnershipForPartitionCreation(); + lastOwnershipRenewal = Instant.now(); + } + + + final Optional optionalPartitionItem = sourceCoordinationStore.getSourcePartitionItem(sourceIdentifierWithPartitionType, partitionIdentifier.getPartitionKey()); if (optionalPartitionItem.isPresent()) { @@ -515,6 +526,20 @@ private Optional acquireGlobalStateForPartitionCreatio return globalStateItemForPartitionCreation; } + private void renewGlobalStateOwnershipForPartitionCreation() { + try { + final Optional globalStateItem = sourceCoordinationStore.getSourcePartitionItem( + sourceIdentifierWithGlobalStateType, GLOBAL_STATE_SOURCE_PARTITION_KEY_FOR_CREATING_PARTITIONS); + + if (globalStateItem.isPresent() && ownerId.equals(globalStateItem.get().getPartitionOwner())) { + globalStateItem.get().setPartitionOwnershipTimeout(Instant.now().plus(DEFAULT_LEASE_TIMEOUT)); + sourceCoordinationStore.tryUpdateSourcePartitionItem(globalStateItem.get()); + } + } catch (final Exception e) { + LOG.warn("Failed to renew global state ownership for partition creation", e); + } + } + private void giveUpAndSaveGlobalStateForPartitionCreation(final SourcePartitionStoreItem globalStateItemForPartitionCreation, final Map globalStateMap) { globalStateItemForPartitionCreation.setSourcePartitionStatus(SourcePartitionStatus.UNASSIGNED); From e535221e06c0533e539af613e3c3555fe72c26b6 Mon Sep 17 00:00:00 2001 From: Siqi Ding Date: Tue, 2 Sep 2025 16:48:50 -0500 Subject: [PATCH 4/6] Add ownership renewal during S3 partition creation Prevents timeout during long-running operations by renewing global state ownership every 3 minutes Signed-off-by: Siqi Ding --- .../source/coordinator/SourceCoordinator.java | 5 +++++ .../LeaseBasedSourceCoordinator.java | 6 ++++-- .../plugins/source/s3/ScanObjectWorker.java | 14 ++++++++++++++ 3 files changed, 23 insertions(+), 2 deletions(-) diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/SourceCoordinator.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/SourceCoordinator.java index c38e657bca..53c84f6d8a 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/SourceCoordinator.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/SourceCoordinator.java @@ -164,4 +164,9 @@ public interface SourceCoordinator { void renewPartitionOwnership(final String partitionKey); void deletePartition(final String partitionKey); + + /** + * Renews the global state ownership for partition creation to prevent timeout during long-running operations + */ + void renewGlobalStateOwnershipForPartitionCreation(); } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/sourcecoordination/LeaseBasedSourceCoordinator.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/sourcecoordination/LeaseBasedSourceCoordinator.java index d838bef92c..eaa793d367 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/sourcecoordination/LeaseBasedSourceCoordinator.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/sourcecoordination/LeaseBasedSourceCoordinator.java @@ -61,6 +61,7 @@ public class LeaseBasedSourceCoordinator implements SourceCoordinator { static final String PARTITION_UPDATE_ERROR_COUNT = "PartitionUpdateErrors"; static final Duration DEFAULT_LEASE_TIMEOUT = Duration.ofMinutes(10); static final Duration DEFAULT_RENEW_TIMEOUT = Duration.ofMinutes(3); + static final Duration OWNERSHIP_RENEWAL_INTERVAL = Duration.ofMinutes(3); private static final String hostName; static final String PARTITION_TYPE = "PARTITION"; @@ -208,7 +209,7 @@ public void createPartitions(final List partitionIdentifier Instant lastOwnershipRenewal = Instant.now(); for (final PartitionIdentifier partitionIdentifier : partitionIdentifiers) { - if (Instant.now().isAfter(lastOwnershipRenewal.plus(DEFAULT_RENEW_TIMEOUT))) { + if (Instant.now().isAfter(lastOwnershipRenewal.plus(OWNERSHIP_RENEWAL_INTERVAL))) { LOG.info("Renewing global state ownership for partition creation"); renewGlobalStateOwnershipForPartitionCreation(); lastOwnershipRenewal = Instant.now(); @@ -526,7 +527,8 @@ private Optional acquireGlobalStateForPartitionCreatio return globalStateItemForPartitionCreation; } - private void renewGlobalStateOwnershipForPartitionCreation() { + @Override + public void renewGlobalStateOwnershipForPartitionCreation() { try { final Optional globalStateItem = sourceCoordinationStore.getSourcePartitionItem( sourceIdentifierWithGlobalStateType, GLOBAL_STATE_SOURCE_PARTITION_KEY_FOR_CREATING_PARTITIONS); diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java index 625e9e0aa6..bc6fbf2901 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java @@ -58,6 +58,7 @@ public class ScanObjectWorker implements Runnable { private static final Integer MAX_OBJECTS_PER_ACKNOWLEDGMENT_SET = 1; static final Integer MAX_RETRIES = 5; static final Duration CHECKPOINT_OWNERSHIP_INTERVAL = Duration.ofMinutes(2); + static final Duration OWNERSHIP_RENEWAL_INTERVAL = Duration.ofMinutes(3); static final Duration NO_OBJECTS_FOUND_BEFORE_PARTITION_DELETION_DURATION = Duration.ofHours(1); private static final int RETRY_BACKOFF_ON_EXCEPTION_MILLIS = 5_000; @@ -305,6 +306,7 @@ private void processFolderPartition(final SourcePartition final String bucket = folderPartition.getPartitionKey().split("\\|")[0]; final String s3Prefix = folderPartition.getPartitionKey().split("\\|")[1]; + sourceCoordinator.renewGlobalStateOwnershipForPartitionCreation(); final List objectsToProcess = getObjectsForPrefix(bucket, s3Prefix); Optional folderPartitionState = folderPartition.getPartitionState(); @@ -337,12 +339,19 @@ private void processFolderPartition(final SourcePartition private List getObjectsForPrefix(final String bucket, final String s3Prefix) { ListObjectsV2Response listObjectsV2Response = null; final List objectsToProcess = new ArrayList<>(); + Instant lastOwnershipRenewal = Instant.now(); final ListObjectsV2Request.Builder listObjectsV2Request = ListObjectsV2Request.builder() .bucket(bucket) .prefix(s3Prefix); do { + // Renew ownership every 3 minutes during long listing operations + if (Duration.between(lastOwnershipRenewal, Instant.now()).compareTo(OWNERSHIP_RENEWAL_INTERVAL) >= 0) { + sourceCoordinator.renewGlobalStateOwnershipForPartitionCreation(); + lastOwnershipRenewal = Instant.now(); + } + listObjectsV2Response = s3Client.listObjectsV2(listObjectsV2Request .fetchOwner(true) .continuationToken(Objects.nonNull(listObjectsV2Response) ? listObjectsV2Response.nextContinuationToken() : null) @@ -379,8 +388,13 @@ private void processObjectsForFolderPartition(final List obje int objectIndex = 0; String activeAcknowledgmentSetId = null; AcknowledgementSet acknowledgementSet = null; + Instant lastOwnershipRenewal = Instant.now(); while (objectIndex < objectsToProcess.size() && objectsProcessed < folderPartitioningOptions.getMaxObjectsPerOwnership()) { + if (Duration.between(lastOwnershipRenewal, Instant.now()).compareTo(OWNERSHIP_RENEWAL_INTERVAL) >= 0) { + sourceCoordinator.renewGlobalStateOwnershipForPartitionCreation(); + lastOwnershipRenewal = Instant.now(); + } final S3ObjectReference s3ObjectReference = objectsToProcess.get(objectIndex); if (objectsProcessed % MAX_OBJECTS_PER_ACKNOWLEDGMENT_SET == 0) { if (acknowledgementSet != null) { From 793f5d33b2490e8886d8a0c268aae1a40f5fbf1f Mon Sep 17 00:00:00 2001 From: Siqi Ding Date: Tue, 9 Sep 2025 15:16:10 -0500 Subject: [PATCH 5/6] Remove the each time update in ScanObjectWorker; Add the lastGlobalOwnershipRenewal related renewGlobalOwnershipIfNeeded to ensures ownership is maintained both before starting and throughout the entire partition creation process Signed-off-by: Siqi Ding --- .../LeaseBasedSourceCoordinator.java | 20 +++++++++++-------- .../plugins/source/s3/ScanObjectWorker.java | 14 ------------- 2 files changed, 12 insertions(+), 22 deletions(-) diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/sourcecoordination/LeaseBasedSourceCoordinator.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/sourcecoordination/LeaseBasedSourceCoordinator.java index eaa793d367..5184e11f0f 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/sourcecoordination/LeaseBasedSourceCoordinator.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/sourcecoordination/LeaseBasedSourceCoordinator.java @@ -96,6 +96,7 @@ public class LeaseBasedSourceCoordinator implements SourceCoordinator { private final ReentrantLock lock; private Instant lastSupplierRunTime; + private Instant lastGlobalOwnershipRenewal; static final Duration FORCE_SUPPLIER_AFTER_DURATION = Duration.ofMinutes(5); @@ -138,6 +139,7 @@ public LeaseBasedSourceCoordinator(final Class partitionProgressStateClass, this.partitionsDeleted = pluginMetrics.counter(PARTITIONS_DELETED); this.lock = new ReentrantLock(); this.lastSupplierRunTime = Instant.now(); + this.lastGlobalOwnershipRenewal = Instant.now(); } @Override @@ -206,16 +208,10 @@ private Optional> getNextPartitionInternal(final Function partitionIdentifiers) { - Instant lastOwnershipRenewal = Instant.now(); + renewGlobalOwnershipIfNeeded(); for (final PartitionIdentifier partitionIdentifier : partitionIdentifiers) { - if (Instant.now().isAfter(lastOwnershipRenewal.plus(OWNERSHIP_RENEWAL_INTERVAL))) { - LOG.info("Renewing global state ownership for partition creation"); - renewGlobalStateOwnershipForPartitionCreation(); - lastOwnershipRenewal = Instant.now(); - } - - + renewGlobalOwnershipIfNeeded(); final Optional optionalPartitionItem = sourceCoordinationStore.getSourcePartitionItem(sourceIdentifierWithPartitionType, partitionIdentifier.getPartitionKey()); @@ -542,6 +538,14 @@ public void renewGlobalStateOwnershipForPartitionCreation() { } } + private void renewGlobalOwnershipIfNeeded() { + if (Instant.now().isAfter(lastGlobalOwnershipRenewal.plus(OWNERSHIP_RENEWAL_INTERVAL))) { + LOG.info("Renewing global state ownership for partition creation"); + renewGlobalStateOwnershipForPartitionCreation(); + lastGlobalOwnershipRenewal = Instant.now(); + } + } + private void giveUpAndSaveGlobalStateForPartitionCreation(final SourcePartitionStoreItem globalStateItemForPartitionCreation, final Map globalStateMap) { globalStateItemForPartitionCreation.setSourcePartitionStatus(SourcePartitionStatus.UNASSIGNED); diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java index bc6fbf2901..625e9e0aa6 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java @@ -58,7 +58,6 @@ public class ScanObjectWorker implements Runnable { private static final Integer MAX_OBJECTS_PER_ACKNOWLEDGMENT_SET = 1; static final Integer MAX_RETRIES = 5; static final Duration CHECKPOINT_OWNERSHIP_INTERVAL = Duration.ofMinutes(2); - static final Duration OWNERSHIP_RENEWAL_INTERVAL = Duration.ofMinutes(3); static final Duration NO_OBJECTS_FOUND_BEFORE_PARTITION_DELETION_DURATION = Duration.ofHours(1); private static final int RETRY_BACKOFF_ON_EXCEPTION_MILLIS = 5_000; @@ -306,7 +305,6 @@ private void processFolderPartition(final SourcePartition final String bucket = folderPartition.getPartitionKey().split("\\|")[0]; final String s3Prefix = folderPartition.getPartitionKey().split("\\|")[1]; - sourceCoordinator.renewGlobalStateOwnershipForPartitionCreation(); final List objectsToProcess = getObjectsForPrefix(bucket, s3Prefix); Optional folderPartitionState = folderPartition.getPartitionState(); @@ -339,19 +337,12 @@ private void processFolderPartition(final SourcePartition private List getObjectsForPrefix(final String bucket, final String s3Prefix) { ListObjectsV2Response listObjectsV2Response = null; final List objectsToProcess = new ArrayList<>(); - Instant lastOwnershipRenewal = Instant.now(); final ListObjectsV2Request.Builder listObjectsV2Request = ListObjectsV2Request.builder() .bucket(bucket) .prefix(s3Prefix); do { - // Renew ownership every 3 minutes during long listing operations - if (Duration.between(lastOwnershipRenewal, Instant.now()).compareTo(OWNERSHIP_RENEWAL_INTERVAL) >= 0) { - sourceCoordinator.renewGlobalStateOwnershipForPartitionCreation(); - lastOwnershipRenewal = Instant.now(); - } - listObjectsV2Response = s3Client.listObjectsV2(listObjectsV2Request .fetchOwner(true) .continuationToken(Objects.nonNull(listObjectsV2Response) ? listObjectsV2Response.nextContinuationToken() : null) @@ -388,13 +379,8 @@ private void processObjectsForFolderPartition(final List obje int objectIndex = 0; String activeAcknowledgmentSetId = null; AcknowledgementSet acknowledgementSet = null; - Instant lastOwnershipRenewal = Instant.now(); while (objectIndex < objectsToProcess.size() && objectsProcessed < folderPartitioningOptions.getMaxObjectsPerOwnership()) { - if (Duration.between(lastOwnershipRenewal, Instant.now()).compareTo(OWNERSHIP_RENEWAL_INTERVAL) >= 0) { - sourceCoordinator.renewGlobalStateOwnershipForPartitionCreation(); - lastOwnershipRenewal = Instant.now(); - } final S3ObjectReference s3ObjectReference = objectsToProcess.get(objectIndex); if (objectsProcessed % MAX_OBJECTS_PER_ACKNOWLEDGMENT_SET == 0) { if (acknowledgementSet != null) { From 0f402ede012559304f5bb7cff7b5f51f3186320b Mon Sep 17 00:00:00 2001 From: Siqi Ding Date: Tue, 9 Sep 2025 17:04:46 -0500 Subject: [PATCH 6/6] Make renewGlobalStateOwnershipForPartitionCreation private Signed-off-by: Siqi Ding --- .../model/source/coordinator/SourceCoordinator.java | 5 ----- .../core/sourcecoordination/LeaseBasedSourceCoordinator.java | 3 +-- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/SourceCoordinator.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/SourceCoordinator.java index 53c84f6d8a..c38e657bca 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/SourceCoordinator.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/SourceCoordinator.java @@ -164,9 +164,4 @@ public interface SourceCoordinator { void renewPartitionOwnership(final String partitionKey); void deletePartition(final String partitionKey); - - /** - * Renews the global state ownership for partition creation to prevent timeout during long-running operations - */ - void renewGlobalStateOwnershipForPartitionCreation(); } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/sourcecoordination/LeaseBasedSourceCoordinator.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/sourcecoordination/LeaseBasedSourceCoordinator.java index 5184e11f0f..ed68fe3c86 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/sourcecoordination/LeaseBasedSourceCoordinator.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/sourcecoordination/LeaseBasedSourceCoordinator.java @@ -523,8 +523,7 @@ private Optional acquireGlobalStateForPartitionCreatio return globalStateItemForPartitionCreation; } - @Override - public void renewGlobalStateOwnershipForPartitionCreation() { + private void renewGlobalStateOwnershipForPartitionCreation() { try { final Optional globalStateItem = sourceCoordinationStore.getSourcePartitionItem( sourceIdentifierWithGlobalStateType, GLOBAL_STATE_SOURCE_PARTITION_KEY_FOR_CREATING_PARTITIONS);