Skip to content

Commit e535221

Browse files
committed
Add ownership renewal during S3 partition creation
Prevents timeout during long-running operations by renewing global state ownership every 3 minutes Signed-off-by: Siqi Ding <dingdd@amazon.com>
1 parent 550fc20 commit e535221

3 files changed

Lines changed: 23 additions & 2 deletions

File tree

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/SourceCoordinator.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,4 +164,9 @@ public interface SourceCoordinator<T> {
164164
void renewPartitionOwnership(final String partitionKey);
165165

166166
void deletePartition(final String partitionKey);
167+
168+
/**
169+
* Renews the global state ownership for partition creation to prevent timeout during long-running operations
170+
*/
171+
void renewGlobalStateOwnershipForPartitionCreation();
167172
}

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/sourcecoordination/LeaseBasedSourceCoordinator.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ public class LeaseBasedSourceCoordinator<T> implements SourceCoordinator<T> {
6161
static final String PARTITION_UPDATE_ERROR_COUNT = "PartitionUpdateErrors";
6262
static final Duration DEFAULT_LEASE_TIMEOUT = Duration.ofMinutes(10);
6363
static final Duration DEFAULT_RENEW_TIMEOUT = Duration.ofMinutes(3);
64+
static final Duration OWNERSHIP_RENEWAL_INTERVAL = Duration.ofMinutes(3);
6465

6566
private static final String hostName;
6667
static final String PARTITION_TYPE = "PARTITION";
@@ -208,7 +209,7 @@ public void createPartitions(final List<PartitionIdentifier> partitionIdentifier
208209
Instant lastOwnershipRenewal = Instant.now();
209210

210211
for (final PartitionIdentifier partitionIdentifier : partitionIdentifiers) {
211-
if (Instant.now().isAfter(lastOwnershipRenewal.plus(DEFAULT_RENEW_TIMEOUT))) {
212+
if (Instant.now().isAfter(lastOwnershipRenewal.plus(OWNERSHIP_RENEWAL_INTERVAL))) {
212213
LOG.info("Renewing global state ownership for partition creation");
213214
renewGlobalStateOwnershipForPartitionCreation();
214215
lastOwnershipRenewal = Instant.now();
@@ -526,7 +527,8 @@ private Optional<SourcePartitionStoreItem> acquireGlobalStateForPartitionCreatio
526527
return globalStateItemForPartitionCreation;
527528
}
528529

529-
private void renewGlobalStateOwnershipForPartitionCreation() {
530+
@Override
531+
public void renewGlobalStateOwnershipForPartitionCreation() {
530532
try {
531533
final Optional<SourcePartitionStoreItem> globalStateItem = sourceCoordinationStore.getSourcePartitionItem(
532534
sourceIdentifierWithGlobalStateType, GLOBAL_STATE_SOURCE_PARTITION_KEY_FOR_CREATING_PARTITIONS);

data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public class ScanObjectWorker implements Runnable {
5858
private static final Integer MAX_OBJECTS_PER_ACKNOWLEDGMENT_SET = 1;
5959
static final Integer MAX_RETRIES = 5;
6060
static final Duration CHECKPOINT_OWNERSHIP_INTERVAL = Duration.ofMinutes(2);
61+
static final Duration OWNERSHIP_RENEWAL_INTERVAL = Duration.ofMinutes(3);
6162

6263
static final Duration NO_OBJECTS_FOUND_BEFORE_PARTITION_DELETION_DURATION = Duration.ofHours(1);
6364
private static final int RETRY_BACKOFF_ON_EXCEPTION_MILLIS = 5_000;
@@ -305,6 +306,7 @@ private void processFolderPartition(final SourcePartition<S3SourceProgressState>
305306
final String bucket = folderPartition.getPartitionKey().split("\\|")[0];
306307
final String s3Prefix = folderPartition.getPartitionKey().split("\\|")[1];
307308

309+
sourceCoordinator.renewGlobalStateOwnershipForPartitionCreation();
308310
final List<S3ObjectReference> objectsToProcess = getObjectsForPrefix(bucket, s3Prefix);
309311

310312
Optional<S3SourceProgressState> folderPartitionState = folderPartition.getPartitionState();
@@ -337,12 +339,19 @@ private void processFolderPartition(final SourcePartition<S3SourceProgressState>
337339
private List<S3ObjectReference> getObjectsForPrefix(final String bucket, final String s3Prefix) {
338340
ListObjectsV2Response listObjectsV2Response = null;
339341
final List<S3ObjectReference> objectsToProcess = new ArrayList<>();
342+
Instant lastOwnershipRenewal = Instant.now();
340343

341344
final ListObjectsV2Request.Builder listObjectsV2Request = ListObjectsV2Request.builder()
342345
.bucket(bucket)
343346
.prefix(s3Prefix);
344347

345348
do {
349+
// Renew ownership every 3 minutes during long listing operations
350+
if (Duration.between(lastOwnershipRenewal, Instant.now()).compareTo(OWNERSHIP_RENEWAL_INTERVAL) >= 0) {
351+
sourceCoordinator.renewGlobalStateOwnershipForPartitionCreation();
352+
lastOwnershipRenewal = Instant.now();
353+
}
354+
346355
listObjectsV2Response = s3Client.listObjectsV2(listObjectsV2Request
347356
.fetchOwner(true)
348357
.continuationToken(Objects.nonNull(listObjectsV2Response) ? listObjectsV2Response.nextContinuationToken() : null)
@@ -379,8 +388,13 @@ private void processObjectsForFolderPartition(final List<S3ObjectReference> obje
379388
int objectIndex = 0;
380389
String activeAcknowledgmentSetId = null;
381390
AcknowledgementSet acknowledgementSet = null;
391+
Instant lastOwnershipRenewal = Instant.now();
382392

383393
while (objectIndex < objectsToProcess.size() && objectsProcessed < folderPartitioningOptions.getMaxObjectsPerOwnership()) {
394+
if (Duration.between(lastOwnershipRenewal, Instant.now()).compareTo(OWNERSHIP_RENEWAL_INTERVAL) >= 0) {
395+
sourceCoordinator.renewGlobalStateOwnershipForPartitionCreation();
396+
lastOwnershipRenewal = Instant.now();
397+
}
384398
final S3ObjectReference s3ObjectReference = objectsToProcess.get(objectIndex);
385399
if (objectsProcessed % MAX_OBJECTS_PER_ACKNOWLEDGMENT_SET == 0) {
386400
if (acknowledgementSet != null) {

0 commit comments

Comments
 (0)