Skip to content

Add change to s3 scan source to create partitions after each page of …#6006

Merged
oeyh merged 6 commits into
opensearch-project:mainfrom
Davidding4718:S3ScanChange
Sep 10, 2025
Merged

Add change to s3 scan source to create partitions after each page of …#6006
oeyh merged 6 commits into
opensearch-project:mainfrom
Davidding4718:S3ScanChange

Conversation

@Davidding4718

@Davidding4718 Davidding4718 commented Aug 21, 2025

Copy link
Copy Markdown
Contributor

(cherry picked from commit bf74cb7)

Description

Implements global state ownership renewal functionality to prevent ownership timeouts during long-running S3 partition creation operations:

  • This change adds a renewGlobalStatePartitionOwnership() method to LeaseBasedSourceCoordinator and integrates periodic ownership renewal (every 2 minutes) in S3ScanPartitionCreationSupplier to maintain ownership during large S3 bucket scanning operations.
  • The implementation ensures reliable partition creation for large S3 buckets without losing global state ownership due to lease expiration.

Issues Resolved

Resolves #5039

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

…objects listed

Signed-off-by: Taylor Gray <tylgry@amazon.com>
(cherry picked from commit bf74cb7)
Signed-off-by: Siqi Ding <dingdd@amazon.com>

private void createPartitions(final List<PartitionIdentifier> partitionIdentifiers) {
@Override
public void createPartitions(final List<PartitionIdentifier> partitionIdentifiers) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the work to renew global partition ownership left out of this PR?

.collect(Collectors.toList()));

.collect(Collectors.toList());
LOG.info("Found page of {} objects from bucket {}", listObjectsV2Response.keyCount(), bucket);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have a check to update global state ownership here

sourceCoordinator.createPartitions(folderPartitionsForPage);
} else {
LOG.info("Creating partitions for {} S3 objects from bucket {}", partitionsForPage.size(), bucket);
sourceCoordinator.createPartitions(partitionsForPage);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is worth considering adding some way to update global state ownership periodically in this call to create partitions as this can potentially take a long time. I would probably have the LeaseBasedSourceCoordinator do this within this method.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have added a method to update global state ownership periodically

Signed-off-by: Siqi Ding <dingdd@amazon.com>
return globalStateItemForPartitionCreation;
}

private void renewGlobalStateOwnershipForPartitionCreation() {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be updating this ownership during the scan as well

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have made the change and added the update during the scan.

Prevents timeout during long-running operations by renewing global state ownership every 3 minutes

Signed-off-by: Siqi Ding <dingdd@amazon.com>
Comment on lines +394 to +397
if (Duration.between(lastOwnershipRenewal, Instant.now()).compareTo(OWNERSHIP_RENEWAL_INTERVAL) >= 0) {
sourceCoordinator.renewGlobalStateOwnershipForPartitionCreation();
lastOwnershipRenewal = Instant.now();
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should remove all the we don't need to update global state in the ScanObjectWorker

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Will remove them.

Comment on lines +350 to +353
if (Duration.between(lastOwnershipRenewal, Instant.now()).compareTo(OWNERSHIP_RENEWAL_INTERVAL) >= 0) {
sourceCoordinator.renewGlobalStateOwnershipForPartitionCreation();
lastOwnershipRenewal = Instant.now();
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here we global state is already done so shouldn't be renewing it

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will remove them.

Instant lastOwnershipRenewal = Instant.now();

for (final PartitionIdentifier partitionIdentifier : partitionIdentifiers) {
if (Instant.now().isAfter(lastOwnershipRenewal.plus(OWNERSHIP_RENEWAL_INTERVAL))) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should handle the case where there are 0 partitions getting passed here but the bucket has many objects to scan. I would make this lastGlobalOwnershipRenewal a variable of the class, then check it one time before you go into the loop of partitions. Break this snippet of code out into a private method

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have made changed based on this comment. Thank you!

graytaylor0
graytaylor0 previously approved these changes Sep 9, 2025
…nershipRenewal related renewGlobalOwnershipIfNeeded to ensures ownership is maintained both before starting and throughout the entire partition creation process

Signed-off-by: Siqi Ding <dingdd@amazon.com>
}

@Override
public void renewGlobalStateOwnershipForPartitionCreation() {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can actually doesn't have to be public right? Probably ok as is though

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have changed to private. Thank you!

Signed-off-by: Siqi Ding <dingdd@amazon.com>
@oeyh oeyh merged commit 771feac into opensearch-project:main Sep 10, 2025
46 of 47 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants