Skip to content

Commit 384e649

Browse files
committed
Remove the each time update in ScanObjectWorker; Add the lastGlobalOwnershipRenewal related renewGlobalOwnershipIfNeeded to ensures ownership is maintained both before starting and throughout the entire partition creation process
1 parent e535221 commit 384e649

2 files changed

Lines changed: 12 additions & 22 deletions

File tree

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/sourcecoordination/LeaseBasedSourceCoordinator.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ public class LeaseBasedSourceCoordinator<T> implements SourceCoordinator<T> {
9696
private final ReentrantLock lock;
9797

9898
private Instant lastSupplierRunTime;
99+
private Instant lastGlobalOwnershipRenewal;
99100

100101
static final Duration FORCE_SUPPLIER_AFTER_DURATION = Duration.ofMinutes(5);
101102

@@ -138,6 +139,7 @@ public LeaseBasedSourceCoordinator(final Class<T> partitionProgressStateClass,
138139
this.partitionsDeleted = pluginMetrics.counter(PARTITIONS_DELETED);
139140
this.lock = new ReentrantLock();
140141
this.lastSupplierRunTime = Instant.now();
142+
this.lastGlobalOwnershipRenewal = Instant.now();
141143
}
142144

143145
@Override
@@ -206,16 +208,10 @@ private Optional<SourcePartition<T>> getNextPartitionInternal(final Function<Map
206208

207209
@Override
208210
public void createPartitions(final List<PartitionIdentifier> partitionIdentifiers) {
209-
Instant lastOwnershipRenewal = Instant.now();
211+
renewGlobalOwnershipIfNeeded();
210212

211213
for (final PartitionIdentifier partitionIdentifier : partitionIdentifiers) {
212-
if (Instant.now().isAfter(lastOwnershipRenewal.plus(OWNERSHIP_RENEWAL_INTERVAL))) {
213-
LOG.info("Renewing global state ownership for partition creation");
214-
renewGlobalStateOwnershipForPartitionCreation();
215-
lastOwnershipRenewal = Instant.now();
216-
}
217-
218-
214+
renewGlobalOwnershipIfNeeded();
219215

220216
final Optional<SourcePartitionStoreItem> optionalPartitionItem = sourceCoordinationStore.getSourcePartitionItem(sourceIdentifierWithPartitionType, partitionIdentifier.getPartitionKey());
221217

@@ -542,6 +538,14 @@ public void renewGlobalStateOwnershipForPartitionCreation() {
542538
}
543539
}
544540

541+
private void renewGlobalOwnershipIfNeeded() {
542+
if (Instant.now().isAfter(lastGlobalOwnershipRenewal.plus(OWNERSHIP_RENEWAL_INTERVAL))) {
543+
LOG.info("Renewing global state ownership for partition creation");
544+
renewGlobalStateOwnershipForPartitionCreation();
545+
lastGlobalOwnershipRenewal = Instant.now();
546+
}
547+
}
548+
545549
private void giveUpAndSaveGlobalStateForPartitionCreation(final SourcePartitionStoreItem globalStateItemForPartitionCreation, final Map<String, Object> globalStateMap) {
546550

547551
globalStateItemForPartitionCreation.setSourcePartitionStatus(SourcePartitionStatus.UNASSIGNED);

data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ public class ScanObjectWorker implements Runnable {
5858
private static final Integer MAX_OBJECTS_PER_ACKNOWLEDGMENT_SET = 1;
5959
static final Integer MAX_RETRIES = 5;
6060
static final Duration CHECKPOINT_OWNERSHIP_INTERVAL = Duration.ofMinutes(2);
61-
static final Duration OWNERSHIP_RENEWAL_INTERVAL = Duration.ofMinutes(3);
6261

6362
static final Duration NO_OBJECTS_FOUND_BEFORE_PARTITION_DELETION_DURATION = Duration.ofHours(1);
6463
private static final int RETRY_BACKOFF_ON_EXCEPTION_MILLIS = 5_000;
@@ -306,7 +305,6 @@ private void processFolderPartition(final SourcePartition<S3SourceProgressState>
306305
final String bucket = folderPartition.getPartitionKey().split("\\|")[0];
307306
final String s3Prefix = folderPartition.getPartitionKey().split("\\|")[1];
308307

309-
sourceCoordinator.renewGlobalStateOwnershipForPartitionCreation();
310308
final List<S3ObjectReference> objectsToProcess = getObjectsForPrefix(bucket, s3Prefix);
311309

312310
Optional<S3SourceProgressState> folderPartitionState = folderPartition.getPartitionState();
@@ -339,19 +337,12 @@ private void processFolderPartition(final SourcePartition<S3SourceProgressState>
339337
private List<S3ObjectReference> getObjectsForPrefix(final String bucket, final String s3Prefix) {
340338
ListObjectsV2Response listObjectsV2Response = null;
341339
final List<S3ObjectReference> objectsToProcess = new ArrayList<>();
342-
Instant lastOwnershipRenewal = Instant.now();
343340

344341
final ListObjectsV2Request.Builder listObjectsV2Request = ListObjectsV2Request.builder()
345342
.bucket(bucket)
346343
.prefix(s3Prefix);
347344

348345
do {
349-
// Renew ownership every 3 minutes during long listing operations
350-
if (Duration.between(lastOwnershipRenewal, Instant.now()).compareTo(OWNERSHIP_RENEWAL_INTERVAL) >= 0) {
351-
sourceCoordinator.renewGlobalStateOwnershipForPartitionCreation();
352-
lastOwnershipRenewal = Instant.now();
353-
}
354-
355346
listObjectsV2Response = s3Client.listObjectsV2(listObjectsV2Request
356347
.fetchOwner(true)
357348
.continuationToken(Objects.nonNull(listObjectsV2Response) ? listObjectsV2Response.nextContinuationToken() : null)
@@ -388,13 +379,8 @@ private void processObjectsForFolderPartition(final List<S3ObjectReference> obje
388379
int objectIndex = 0;
389380
String activeAcknowledgmentSetId = null;
390381
AcknowledgementSet acknowledgementSet = null;
391-
Instant lastOwnershipRenewal = Instant.now();
392382

393383
while (objectIndex < objectsToProcess.size() && objectsProcessed < folderPartitioningOptions.getMaxObjectsPerOwnership()) {
394-
if (Duration.between(lastOwnershipRenewal, Instant.now()).compareTo(OWNERSHIP_RENEWAL_INTERVAL) >= 0) {
395-
sourceCoordinator.renewGlobalStateOwnershipForPartitionCreation();
396-
lastOwnershipRenewal = Instant.now();
397-
}
398384
final S3ObjectReference s3ObjectReference = objectsToProcess.get(objectIndex);
399385
if (objectsProcessed % MAX_OBJECTS_PER_ACKNOWLEDGMENT_SET == 0) {
400386
if (acknowledgementSet != null) {

0 commit comments

Comments
 (0)