Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ public interface SourceCoordinator<T> {
*/
Optional<SourcePartition<T>> getNextPartition(final Function<Map<String, Object>, List<PartitionIdentifier>> partitionCreationSupplier, final boolean forceSupplier);

/**
* Can be used to directly create partitions for source coordination, as an alternative to relying on getNextPartition to create partitions.
* @param partitionIdentifiers - The partitions to be created.
*/
void createPartitions(final List<PartitionIdentifier> partitionIdentifiers);

/**
* Should be called by the source when it has fully processed a given partition
* @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionNotFoundException if the partition key could not be found in the distributed store
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public class LeaseBasedSourceCoordinator<T> implements SourceCoordinator<T> {
static final String PARTITION_NOT_OWNED_ERROR_COUNT = "partitionNotOwnedErrors";
static final String PARTITION_UPDATE_ERROR_COUNT = "PartitionUpdateErrors";
static final Duration DEFAULT_LEASE_TIMEOUT = Duration.ofMinutes(10);
static final Duration DEFAULT_RENEW_TIMEOUT = Duration.ofMinutes(3);
static final Duration OWNERSHIP_RENEWAL_INTERVAL = Duration.ofMinutes(3);

private static final String hostName;
static final String PARTITION_TYPE = "PARTITION";
Expand Down Expand Up @@ -94,6 +96,7 @@ public class LeaseBasedSourceCoordinator<T> implements SourceCoordinator<T> {
private final ReentrantLock lock;

private Instant lastSupplierRunTime;
private Instant lastGlobalOwnershipRenewal;

static final Duration FORCE_SUPPLIER_AFTER_DURATION = Duration.ofMinutes(5);

Expand Down Expand Up @@ -136,6 +139,7 @@ public LeaseBasedSourceCoordinator(final Class<T> partitionProgressStateClass,
this.partitionsDeleted = pluginMetrics.counter(PARTITIONS_DELETED);
this.lock = new ReentrantLock();
this.lastSupplierRunTime = Instant.now();
this.lastGlobalOwnershipRenewal = Instant.now();
}

@Override
Expand Down Expand Up @@ -202,8 +206,13 @@ private Optional<SourcePartition<T>> getNextPartitionInternal(final Function<Map
return Optional.of(sourcePartition);
}

private void createPartitions(final List<PartitionIdentifier> partitionIdentifiers) {
@Override
public void createPartitions(final List<PartitionIdentifier> partitionIdentifiers) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the work to renew global partition ownership left out of this PR?

renewGlobalOwnershipIfNeeded();

for (final PartitionIdentifier partitionIdentifier : partitionIdentifiers) {
renewGlobalOwnershipIfNeeded();

final Optional<SourcePartitionStoreItem> optionalPartitionItem = sourceCoordinationStore.getSourcePartitionItem(sourceIdentifierWithPartitionType, partitionIdentifier.getPartitionKey());

if (optionalPartitionItem.isPresent()) {
Expand Down Expand Up @@ -514,6 +523,28 @@ private Optional<SourcePartitionStoreItem> acquireGlobalStateForPartitionCreatio
return globalStateItemForPartitionCreation;
}

private void renewGlobalStateOwnershipForPartitionCreation() {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be updating this ownership during the scan as well

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have made the change and added the update during the scan.

try {
final Optional<SourcePartitionStoreItem> globalStateItem = sourceCoordinationStore.getSourcePartitionItem(
sourceIdentifierWithGlobalStateType, GLOBAL_STATE_SOURCE_PARTITION_KEY_FOR_CREATING_PARTITIONS);

if (globalStateItem.isPresent() && ownerId.equals(globalStateItem.get().getPartitionOwner())) {
globalStateItem.get().setPartitionOwnershipTimeout(Instant.now().plus(DEFAULT_LEASE_TIMEOUT));
sourceCoordinationStore.tryUpdateSourcePartitionItem(globalStateItem.get());
}
} catch (final Exception e) {
LOG.warn("Failed to renew global state ownership for partition creation", e);
}
}

private void renewGlobalOwnershipIfNeeded() {
if (Instant.now().isAfter(lastGlobalOwnershipRenewal.plus(OWNERSHIP_RENEWAL_INTERVAL))) {
LOG.info("Renewing global state ownership for partition creation");
renewGlobalStateOwnershipForPartitionCreation();
lastGlobalOwnershipRenewal = Instant.now();
}
}

private void giveUpAndSaveGlobalStateForPartitionCreation(final SourcePartitionStoreItem globalStateItemForPartitionCreation, final Map<String, Object> globalStateMap) {

globalStateItemForPartitionCreation.setSourcePartitionStatus(SourcePartitionStatus.UNASSIGNED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.source.s3;

import org.opensearch.dataprepper.model.source.coordinator.PartitionIdentifier;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.plugins.source.s3.configuration.FolderPartitioningOptions;
import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanKeyPathOption;
import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanSchedulingOptions;
Expand Down Expand Up @@ -53,19 +54,23 @@ public class S3ScanPartitionCreationSupplier implements Function<Map<String, Obj

private final boolean deleteS3ObjectsOnRead;

private final SourceCoordinator<S3SourceProgressState> sourceCoordinator;

public S3ScanPartitionCreationSupplier(final S3Client s3Client,
final BucketOwnerProvider bucketOwnerProvider,
final List<ScanOptions> scanOptionsList,
final S3ScanSchedulingOptions schedulingOptions,
final FolderPartitioningOptions folderPartitioningOptions,
final boolean deleteS3ObjectsOnRead) {
final boolean deleteS3ObjectsOnRead,
final SourceCoordinator<S3SourceProgressState> sourceCoordinator) {

this.s3Client = s3Client;
this.bucketOwnerProvider = bucketOwnerProvider;
this.scanOptionsList = scanOptionsList;
this.schedulingOptions = schedulingOptions;
this.folderPartitioningOptions = folderPartitioningOptions;
this.deleteS3ObjectsOnRead = deleteS3ObjectsOnRead;
this.sourceCoordinator = sourceCoordinator;
}

@Override
Expand Down Expand Up @@ -99,12 +104,13 @@ public List<PartitionIdentifier> apply(final Map<String, Object> globalStateMap)
if (Objects.nonNull(s3ScanKeyPathOption) && Objects.nonNull(s3ScanKeyPathOption.getS3scanIncludePrefixOptions()))
s3ScanKeyPathOption.getS3scanIncludePrefixOptions().forEach(includePath -> {
listObjectsV2Request.prefix(includePath);
objectsToProcess.addAll(listFilteredS3ObjectsForBucket(excludeItems, listObjectsV2Request,
bucketName, scanOptions.getUseStartDateTime(), scanOptions.getUseEndDateTime(), globalStateMap));
createFilteredS3ObjectPartitionsForBucket(excludeItems, listObjectsV2Request,
bucketName, scanOptions.getUseStartDateTime(), scanOptions.getUseEndDateTime(), globalStateMap);
});
else
objectsToProcess.addAll(listFilteredS3ObjectsForBucket(excludeItems, listObjectsV2Request,
bucketName, scanOptions.getUseStartDateTime(), scanOptions.getUseEndDateTime(), globalStateMap));
createFilteredS3ObjectPartitionsForBucket(excludeItems, listObjectsV2Request,
bucketName, scanOptions.getUseStartDateTime(), scanOptions.getUseEndDateTime(), globalStateMap);

if (!bucketScanTime.containsKey(bucketName)) {
bucketScanTime.put(bucketName, updatedScanTime.toString());
}
Expand All @@ -117,22 +123,23 @@ public List<PartitionIdentifier> apply(final Map<String, Object> globalStateMap)
globalStateMap.put(SCAN_COUNT, (Integer) globalStateMap.get(SCAN_COUNT) + 1);
globalStateMap.put(LAST_SCAN_TIME, Instant.now().toEpochMilli());

return objectsToProcess;
// Partitions are created by the supplier, so source coordinator does not need to attempt to create any partitions
return Collections.emptyList();
}

private List<PartitionIdentifier> listFilteredS3ObjectsForBucket(final List<String> excludeKeyPaths,
private void createFilteredS3ObjectPartitionsForBucket(final List<String> excludeKeyPaths,
final ListObjectsV2Request.Builder listObjectsV2Request,
final String bucket,
final LocalDateTime startDateTime,
final LocalDateTime endDateTime,
final Map<String, Object> globalStateMap) {
final Instant previousScanTime = globalStateMap.get(bucket) != null ? Instant.parse((String) globalStateMap.get(bucket)) : null;
final boolean isFirstScan = previousScanTime == null;
final List<PartitionIdentifier> allPartitionIdentifiers = new ArrayList<>();
ListObjectsV2Response listObjectsV2Response = null;

do {
listObjectsV2Response = s3Client.listObjectsV2(listObjectsV2Request.fetchOwner(true).continuationToken(Objects.nonNull(listObjectsV2Response) ? listObjectsV2Response.nextContinuationToken() : null).build());
allPartitionIdentifiers.addAll(listObjectsV2Response.contents().stream()
final List<PartitionIdentifier> partitionsForPage = listObjectsV2Response.contents().stream()
.filter(s3Object -> deleteS3ObjectsOnRead || isLastModifiedTimeAfterMostRecentScanForBucket(previousScanTime, s3Object))
.map(s3Object -> Pair.of(s3Object.key(), instantToLocalDateTime(s3Object.lastModified())))
.filter(keyTimestampPair -> !keyTimestampPair.left().endsWith("/"))
Expand All @@ -141,32 +148,17 @@ private List<PartitionIdentifier> listFilteredS3ObjectsForBucket(final List<Stri
.filter(keyTimestampPair -> isKeyMatchedBetweenTimeRange(keyTimestampPair.right(), startDateTime, endDateTime, isFirstScan))
.map(Pair::left)
.map(objectKey -> PartitionIdentifier.builder().withPartitionKey(String.format(BUCKET_OBJECT_PARTITION_KEY_FORMAT, bucket, objectKey)).build())
.collect(Collectors.toList()));

.collect(Collectors.toList());
LOG.info("Found page of {} objects from bucket {}", listObjectsV2Response.keyCount(), bucket);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have a check to update global state ownership here

} while (listObjectsV2Response.isTruncated());

if (folderPartitioningOptions != null) {
final Set<PartitionIdentifier> folderPartitions = allPartitionIdentifiers.stream()
.map(partitionIdentifier -> {
final String fullObjectKey = partitionIdentifier.getPartitionKey();
final String prefix = getPrefixWithDepth(fullObjectKey);
if (prefix == null) {
return null;
}
return PartitionIdentifier.builder().withPartitionKey(prefix).build();
})
.filter(Objects::nonNull)
.collect(Collectors.toSet());

LOG.info("Running in folder_partitions mode at depth {}, found {} unique prefixes from {} objects", folderPartitioningOptions.getFolderDepth(), folderPartitions.size(), allPartitionIdentifiers.size());

return new ArrayList<>(folderPartitions);
} else {
LOG.info("Returning partitions for {} S3 objects from bucket {}", allPartitionIdentifiers.size(), bucket);
}

return allPartitionIdentifiers;
if (folderPartitioningOptions != null) {
final List<PartitionIdentifier> folderPartitionsForPage = getFolderPartitionIdentifiers(partitionsForPage);
sourceCoordinator.createPartitions(folderPartitionsForPage);
} else {
LOG.info("Creating partitions for {} S3 objects from bucket {}", partitionsForPage.size(), bucket);
sourceCoordinator.createPartitions(partitionsForPage);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is worth considering adding some way to update global state ownership periodically in this call to create partitions as this can potentially take a long time. I would probably have the LeaseBasedSourceCoordinator do this within this method.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have added a method to update global state ownership periodically

}
} while (listObjectsV2Response.isTruncated());
}

private LocalDateTime instantToLocalDateTime(final Instant instant) {
Expand Down Expand Up @@ -264,4 +256,23 @@ private String getPrefixWithDepth(final String fullObjectKey) {
int actualDepth = min(folderPartitioningOptions.getFolderDepth(), folders.length - 1);
return String.join("/", Arrays.copyOfRange(folders, 0, actualDepth)) + "/";
}

private List<PartitionIdentifier> getFolderPartitionIdentifiers(final List<PartitionIdentifier> objectPartitionIdentifiers) {

final Set<PartitionIdentifier> folderPartitions = objectPartitionIdentifiers.stream()
.map(partitionIdentifier -> {
final String fullObjectKey = partitionIdentifier.getPartitionKey();
final String prefix = getPrefixWithDepth(fullObjectKey);
if (prefix == null) {
return null;
}
return PartitionIdentifier.builder().withPartitionKey(prefix).build();
})
.filter(Objects::nonNull)
.collect(Collectors.toSet());

LOG.info("Running in folder_partitions mode at depth {}, found {} unique prefixes from {} objects", folderPartitioningOptions.getFolderDepth(), folderPartitions.size(), objectPartitionIdentifiers.size());

return new ArrayList<>(folderPartitions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public ScanObjectWorker(final S3Client s3Client,
this.folderPartitioningOptions = s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions();
this.acknowledgmentSetTimeout = s3SourceConfig.getS3ScanScanOptions().getAcknowledgmentTimeout();

this.partitionCreationSupplier = new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsBuilderList, s3ScanSchedulingOptions, s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions(), s3SourceConfig.isDeleteS3ObjectsOnRead());
this.partitionCreationSupplier = new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsBuilderList, s3ScanSchedulingOptions, s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions(), s3SourceConfig.isDeleteS3ObjectsOnRead(), sourceCoordinator);
this.acknowledgmentsRemainingForPartitions = new ConcurrentHashMap<>();
this.objectsToDeleteForAcknowledgmentSets = new ConcurrentHashMap<>();
}
Expand Down Expand Up @@ -373,7 +373,6 @@ private boolean shouldDeleteFolderPartition(final SourcePartition<S3SourceProgre
return false;
}


private void processObjectsForFolderPartition(final List<S3ObjectReference> objectsToProcess,
final SourcePartition<S3SourceProgressState> folderPartition) {
int objectsProcessed = 0;
Expand Down
Loading
Loading