Add change to s3 scan source to create partitions after each page of …#6006
Conversation
…objects listed Signed-off-by: Taylor Gray <tylgry@amazon.com> (cherry picked from commit bf74cb7)
Signed-off-by: Siqi Ding <dingdd@amazon.com>
e08270a to
359b7d0
Compare
|
|
||
| private void createPartitions(final List<PartitionIdentifier> partitionIdentifiers) { | ||
| @Override | ||
| public void createPartitions(final List<PartitionIdentifier> partitionIdentifiers) { |
There was a problem hiding this comment.
Is the work to renew global partition ownership left out of this PR?
| .collect(Collectors.toList())); | ||
|
|
||
| .collect(Collectors.toList()); | ||
| LOG.info("Found page of {} objects from bucket {}", listObjectsV2Response.keyCount(), bucket); |
There was a problem hiding this comment.
We should have a check to update global state ownership here
| sourceCoordinator.createPartitions(folderPartitionsForPage); | ||
| } else { | ||
| LOG.info("Creating partitions for {} S3 objects from bucket {}", partitionsForPage.size(), bucket); | ||
| sourceCoordinator.createPartitions(partitionsForPage); |
There was a problem hiding this comment.
It is worth considering adding some way to update global state ownership periodically in this call to create partitions as this can potentially take a long time. I would probably have the LeaseBasedSourceCoordinator do this within this method.
There was a problem hiding this comment.
Have added a method to update global state ownership periodically
Signed-off-by: Siqi Ding <dingdd@amazon.com>
543cd61 to
550fc20
Compare
| return globalStateItemForPartitionCreation; | ||
| } | ||
|
|
||
| private void renewGlobalStateOwnershipForPartitionCreation() { |
There was a problem hiding this comment.
We should be updating this ownership during the scan as well
There was a problem hiding this comment.
Have made the change and added the update during the scan.
Prevents timeout during long-running operations by renewing global state ownership every 3 minutes Signed-off-by: Siqi Ding <dingdd@amazon.com>
| if (Duration.between(lastOwnershipRenewal, Instant.now()).compareTo(OWNERSHIP_RENEWAL_INTERVAL) >= 0) { | ||
| sourceCoordinator.renewGlobalStateOwnershipForPartitionCreation(); | ||
| lastOwnershipRenewal = Instant.now(); | ||
| } |
There was a problem hiding this comment.
You should remove all the we don't need to update global state in the ScanObjectWorker
There was a problem hiding this comment.
Got it. Will remove them.
| if (Duration.between(lastOwnershipRenewal, Instant.now()).compareTo(OWNERSHIP_RENEWAL_INTERVAL) >= 0) { | ||
| sourceCoordinator.renewGlobalStateOwnershipForPartitionCreation(); | ||
| lastOwnershipRenewal = Instant.now(); | ||
| } |
There was a problem hiding this comment.
Same here we global state is already done so shouldn't be renewing it
There was a problem hiding this comment.
Will remove them.
| Instant lastOwnershipRenewal = Instant.now(); | ||
|
|
||
| for (final PartitionIdentifier partitionIdentifier : partitionIdentifiers) { | ||
| if (Instant.now().isAfter(lastOwnershipRenewal.plus(OWNERSHIP_RENEWAL_INTERVAL))) { |
There was a problem hiding this comment.
We should handle the case where there are 0 partitions getting passed here but the bucket has many objects to scan. I would make this lastGlobalOwnershipRenewal a variable of the class, then check it one time before you go into the loop of partitions. Break this snippet of code out into a private method
There was a problem hiding this comment.
Have made changed based on this comment. Thank you!
…nershipRenewal related renewGlobalOwnershipIfNeeded to ensures ownership is maintained both before starting and throughout the entire partition creation process Signed-off-by: Siqi Ding <dingdd@amazon.com>
384e649 to
793f5d3
Compare
| } | ||
|
|
||
| @Override | ||
| public void renewGlobalStateOwnershipForPartitionCreation() { |
There was a problem hiding this comment.
I think this can actually doesn't have to be public right? Probably ok as is though
There was a problem hiding this comment.
Have changed to private. Thank you!
Signed-off-by: Siqi Ding <dingdd@amazon.com>
(cherry picked from commit bf74cb7)
Description
Implements global state ownership renewal functionality to prevent ownership timeouts during long-running S3 partition creation operations:
renewGlobalStatePartitionOwnership()method toLeaseBasedSourceCoordinatorand integrates periodic ownership renewal (every 2 minutes) inS3ScanPartitionCreationSupplierto maintain ownership during large S3 bucket scanning operations.Issues Resolved
Resolves #5039
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.