-
Notifications
You must be signed in to change notification settings - Fork 325
Add change to s3 scan source to create partitions after each page of … #6006
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
107e594
359b7d0
550fc20
e535221
793f5d3
0f402ed
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -60,6 +60,8 @@ public class LeaseBasedSourceCoordinator<T> implements SourceCoordinator<T> { | |
| static final String PARTITION_NOT_OWNED_ERROR_COUNT = "partitionNotOwnedErrors"; | ||
| static final String PARTITION_UPDATE_ERROR_COUNT = "PartitionUpdateErrors"; | ||
| static final Duration DEFAULT_LEASE_TIMEOUT = Duration.ofMinutes(10); | ||
| static final Duration DEFAULT_RENEW_TIMEOUT = Duration.ofMinutes(3); | ||
| static final Duration OWNERSHIP_RENEWAL_INTERVAL = Duration.ofMinutes(3); | ||
|
|
||
| private static final String hostName; | ||
| static final String PARTITION_TYPE = "PARTITION"; | ||
|
|
@@ -94,6 +96,7 @@ public class LeaseBasedSourceCoordinator<T> implements SourceCoordinator<T> { | |
| private final ReentrantLock lock; | ||
|
|
||
| private Instant lastSupplierRunTime; | ||
| private Instant lastGlobalOwnershipRenewal; | ||
|
|
||
| static final Duration FORCE_SUPPLIER_AFTER_DURATION = Duration.ofMinutes(5); | ||
|
|
||
|
|
@@ -136,6 +139,7 @@ public LeaseBasedSourceCoordinator(final Class<T> partitionProgressStateClass, | |
| this.partitionsDeleted = pluginMetrics.counter(PARTITIONS_DELETED); | ||
| this.lock = new ReentrantLock(); | ||
| this.lastSupplierRunTime = Instant.now(); | ||
| this.lastGlobalOwnershipRenewal = Instant.now(); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -202,8 +206,13 @@ private Optional<SourcePartition<T>> getNextPartitionInternal(final Function<Map | |
| return Optional.of(sourcePartition); | ||
| } | ||
|
|
||
| private void createPartitions(final List<PartitionIdentifier> partitionIdentifiers) { | ||
| @Override | ||
| public void createPartitions(final List<PartitionIdentifier> partitionIdentifiers) { | ||
| renewGlobalOwnershipIfNeeded(); | ||
|
|
||
| for (final PartitionIdentifier partitionIdentifier : partitionIdentifiers) { | ||
| renewGlobalOwnershipIfNeeded(); | ||
|
|
||
| final Optional<SourcePartitionStoreItem> optionalPartitionItem = sourceCoordinationStore.getSourcePartitionItem(sourceIdentifierWithPartitionType, partitionIdentifier.getPartitionKey()); | ||
|
|
||
| if (optionalPartitionItem.isPresent()) { | ||
|
|
@@ -514,6 +523,28 @@ private Optional<SourcePartitionStoreItem> acquireGlobalStateForPartitionCreatio | |
| return globalStateItemForPartitionCreation; | ||
| } | ||
|
|
||
| private void renewGlobalStateOwnershipForPartitionCreation() { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should be updating this ownership during the scan as well
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have made the change and added the update during the scan. |
||
| try { | ||
| final Optional<SourcePartitionStoreItem> globalStateItem = sourceCoordinationStore.getSourcePartitionItem( | ||
| sourceIdentifierWithGlobalStateType, GLOBAL_STATE_SOURCE_PARTITION_KEY_FOR_CREATING_PARTITIONS); | ||
|
|
||
| if (globalStateItem.isPresent() && ownerId.equals(globalStateItem.get().getPartitionOwner())) { | ||
| globalStateItem.get().setPartitionOwnershipTimeout(Instant.now().plus(DEFAULT_LEASE_TIMEOUT)); | ||
| sourceCoordinationStore.tryUpdateSourcePartitionItem(globalStateItem.get()); | ||
| } | ||
| } catch (final Exception e) { | ||
| LOG.warn("Failed to renew global state ownership for partition creation", e); | ||
| } | ||
| } | ||
|
|
||
| private void renewGlobalOwnershipIfNeeded() { | ||
| if (Instant.now().isAfter(lastGlobalOwnershipRenewal.plus(OWNERSHIP_RENEWAL_INTERVAL))) { | ||
| LOG.info("Renewing global state ownership for partition creation"); | ||
| renewGlobalStateOwnershipForPartitionCreation(); | ||
| lastGlobalOwnershipRenewal = Instant.now(); | ||
| } | ||
| } | ||
|
|
||
| private void giveUpAndSaveGlobalStateForPartitionCreation(final SourcePartitionStoreItem globalStateItemForPartitionCreation, final Map<String, Object> globalStateMap) { | ||
|
|
||
| globalStateItemForPartitionCreation.setSourcePartitionStatus(SourcePartitionStatus.UNASSIGNED); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,6 +6,7 @@ | |
| package org.opensearch.dataprepper.plugins.source.s3; | ||
|
|
||
| import org.opensearch.dataprepper.model.source.coordinator.PartitionIdentifier; | ||
| import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; | ||
| import org.opensearch.dataprepper.plugins.source.s3.configuration.FolderPartitioningOptions; | ||
| import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanKeyPathOption; | ||
| import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanSchedulingOptions; | ||
|
|
@@ -53,19 +54,23 @@ public class S3ScanPartitionCreationSupplier implements Function<Map<String, Obj | |
|
|
||
| private final boolean deleteS3ObjectsOnRead; | ||
|
|
||
| private final SourceCoordinator<S3SourceProgressState> sourceCoordinator; | ||
|
|
||
| public S3ScanPartitionCreationSupplier(final S3Client s3Client, | ||
| final BucketOwnerProvider bucketOwnerProvider, | ||
| final List<ScanOptions> scanOptionsList, | ||
| final S3ScanSchedulingOptions schedulingOptions, | ||
| final FolderPartitioningOptions folderPartitioningOptions, | ||
| final boolean deleteS3ObjectsOnRead) { | ||
| final boolean deleteS3ObjectsOnRead, | ||
| final SourceCoordinator<S3SourceProgressState> sourceCoordinator) { | ||
|
|
||
| this.s3Client = s3Client; | ||
| this.bucketOwnerProvider = bucketOwnerProvider; | ||
| this.scanOptionsList = scanOptionsList; | ||
| this.schedulingOptions = schedulingOptions; | ||
| this.folderPartitioningOptions = folderPartitioningOptions; | ||
| this.deleteS3ObjectsOnRead = deleteS3ObjectsOnRead; | ||
| this.sourceCoordinator = sourceCoordinator; | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -99,12 +104,13 @@ public List<PartitionIdentifier> apply(final Map<String, Object> globalStateMap) | |
| if (Objects.nonNull(s3ScanKeyPathOption) && Objects.nonNull(s3ScanKeyPathOption.getS3scanIncludePrefixOptions())) | ||
| s3ScanKeyPathOption.getS3scanIncludePrefixOptions().forEach(includePath -> { | ||
| listObjectsV2Request.prefix(includePath); | ||
| objectsToProcess.addAll(listFilteredS3ObjectsForBucket(excludeItems, listObjectsV2Request, | ||
| bucketName, scanOptions.getUseStartDateTime(), scanOptions.getUseEndDateTime(), globalStateMap)); | ||
| createFilteredS3ObjectPartitionsForBucket(excludeItems, listObjectsV2Request, | ||
| bucketName, scanOptions.getUseStartDateTime(), scanOptions.getUseEndDateTime(), globalStateMap); | ||
| }); | ||
| else | ||
| objectsToProcess.addAll(listFilteredS3ObjectsForBucket(excludeItems, listObjectsV2Request, | ||
| bucketName, scanOptions.getUseStartDateTime(), scanOptions.getUseEndDateTime(), globalStateMap)); | ||
| createFilteredS3ObjectPartitionsForBucket(excludeItems, listObjectsV2Request, | ||
| bucketName, scanOptions.getUseStartDateTime(), scanOptions.getUseEndDateTime(), globalStateMap); | ||
|
|
||
| if (!bucketScanTime.containsKey(bucketName)) { | ||
| bucketScanTime.put(bucketName, updatedScanTime.toString()); | ||
| } | ||
|
|
@@ -117,22 +123,23 @@ public List<PartitionIdentifier> apply(final Map<String, Object> globalStateMap) | |
| globalStateMap.put(SCAN_COUNT, (Integer) globalStateMap.get(SCAN_COUNT) + 1); | ||
| globalStateMap.put(LAST_SCAN_TIME, Instant.now().toEpochMilli()); | ||
|
|
||
| return objectsToProcess; | ||
| // Partitions are created by the supplier, so source coordinator does not need to attempt to create any partitions | ||
| return Collections.emptyList(); | ||
| } | ||
|
|
||
| private List<PartitionIdentifier> listFilteredS3ObjectsForBucket(final List<String> excludeKeyPaths, | ||
| private void createFilteredS3ObjectPartitionsForBucket(final List<String> excludeKeyPaths, | ||
| final ListObjectsV2Request.Builder listObjectsV2Request, | ||
| final String bucket, | ||
| final LocalDateTime startDateTime, | ||
| final LocalDateTime endDateTime, | ||
| final Map<String, Object> globalStateMap) { | ||
| final Instant previousScanTime = globalStateMap.get(bucket) != null ? Instant.parse((String) globalStateMap.get(bucket)) : null; | ||
| final boolean isFirstScan = previousScanTime == null; | ||
| final List<PartitionIdentifier> allPartitionIdentifiers = new ArrayList<>(); | ||
| ListObjectsV2Response listObjectsV2Response = null; | ||
|
|
||
| do { | ||
| listObjectsV2Response = s3Client.listObjectsV2(listObjectsV2Request.fetchOwner(true).continuationToken(Objects.nonNull(listObjectsV2Response) ? listObjectsV2Response.nextContinuationToken() : null).build()); | ||
| allPartitionIdentifiers.addAll(listObjectsV2Response.contents().stream() | ||
| final List<PartitionIdentifier> partitionsForPage = listObjectsV2Response.contents().stream() | ||
| .filter(s3Object -> deleteS3ObjectsOnRead || isLastModifiedTimeAfterMostRecentScanForBucket(previousScanTime, s3Object)) | ||
| .map(s3Object -> Pair.of(s3Object.key(), instantToLocalDateTime(s3Object.lastModified()))) | ||
| .filter(keyTimestampPair -> !keyTimestampPair.left().endsWith("/")) | ||
|
|
@@ -141,32 +148,17 @@ private List<PartitionIdentifier> listFilteredS3ObjectsForBucket(final List<Stri | |
| .filter(keyTimestampPair -> isKeyMatchedBetweenTimeRange(keyTimestampPair.right(), startDateTime, endDateTime, isFirstScan)) | ||
| .map(Pair::left) | ||
| .map(objectKey -> PartitionIdentifier.builder().withPartitionKey(String.format(BUCKET_OBJECT_PARTITION_KEY_FORMAT, bucket, objectKey)).build()) | ||
| .collect(Collectors.toList())); | ||
|
|
||
| .collect(Collectors.toList()); | ||
| LOG.info("Found page of {} objects from bucket {}", listObjectsV2Response.keyCount(), bucket); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should have a check to update global state ownership here |
||
| } while (listObjectsV2Response.isTruncated()); | ||
|
|
||
| if (folderPartitioningOptions != null) { | ||
| final Set<PartitionIdentifier> folderPartitions = allPartitionIdentifiers.stream() | ||
| .map(partitionIdentifier -> { | ||
| final String fullObjectKey = partitionIdentifier.getPartitionKey(); | ||
| final String prefix = getPrefixWithDepth(fullObjectKey); | ||
| if (prefix == null) { | ||
| return null; | ||
| } | ||
| return PartitionIdentifier.builder().withPartitionKey(prefix).build(); | ||
| }) | ||
| .filter(Objects::nonNull) | ||
| .collect(Collectors.toSet()); | ||
|
|
||
| LOG.info("Running in folder_partitions mode at depth {}, found {} unique prefixes from {} objects", folderPartitioningOptions.getFolderDepth(), folderPartitions.size(), allPartitionIdentifiers.size()); | ||
|
|
||
| return new ArrayList<>(folderPartitions); | ||
| } else { | ||
| LOG.info("Returning partitions for {} S3 objects from bucket {}", allPartitionIdentifiers.size(), bucket); | ||
| } | ||
|
|
||
| return allPartitionIdentifiers; | ||
| if (folderPartitioningOptions != null) { | ||
| final List<PartitionIdentifier> folderPartitionsForPage = getFolderPartitionIdentifiers(partitionsForPage); | ||
| sourceCoordinator.createPartitions(folderPartitionsForPage); | ||
| } else { | ||
| LOG.info("Creating partitions for {} S3 objects from bucket {}", partitionsForPage.size(), bucket); | ||
| sourceCoordinator.createPartitions(partitionsForPage); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is worth considering adding some way to update global state ownership periodically in this call to create partitions as this can potentially take a long time. I would probably have the LeaseBasedSourceCoordinator do this within this method.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have added a method to update global state ownership periodically |
||
| } | ||
| } while (listObjectsV2Response.isTruncated()); | ||
| } | ||
|
|
||
| private LocalDateTime instantToLocalDateTime(final Instant instant) { | ||
|
|
@@ -264,4 +256,23 @@ private String getPrefixWithDepth(final String fullObjectKey) { | |
| int actualDepth = min(folderPartitioningOptions.getFolderDepth(), folders.length - 1); | ||
| return String.join("/", Arrays.copyOfRange(folders, 0, actualDepth)) + "/"; | ||
| } | ||
|
|
||
| private List<PartitionIdentifier> getFolderPartitionIdentifiers(final List<PartitionIdentifier> objectPartitionIdentifiers) { | ||
|
|
||
| final Set<PartitionIdentifier> folderPartitions = objectPartitionIdentifiers.stream() | ||
| .map(partitionIdentifier -> { | ||
| final String fullObjectKey = partitionIdentifier.getPartitionKey(); | ||
| final String prefix = getPrefixWithDepth(fullObjectKey); | ||
| if (prefix == null) { | ||
| return null; | ||
| } | ||
| return PartitionIdentifier.builder().withPartitionKey(prefix).build(); | ||
| }) | ||
| .filter(Objects::nonNull) | ||
| .collect(Collectors.toSet()); | ||
|
|
||
| LOG.info("Running in folder_partitions mode at depth {}, found {} unique prefixes from {} objects", folderPartitioningOptions.getFolderDepth(), folderPartitions.size(), objectPartitionIdentifiers.size()); | ||
|
|
||
| return new ArrayList<>(folderPartitions); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the work to renew global partition ownership left out of this PR?