Skip to content

Commit 300afca

Browse files
committed
Make acknowledgment_timeout configurable for s3 scan source, configure timeout to 10 minutes for MongoDB L2 transform
Signed-off-by: Taylor Gray <tylgry@amazon.com>
1 parent d310d1c commit 300afca

5 files changed

Lines changed: 23 additions & 12 deletions

File tree

data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/templates/documentdb-template.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
delete_s3_objects_on_read: true
6767
disable_s3_metadata_in_event: true
6868
scan:
69+
acknowledgment_timeout: "PT10M"
6970
folder_partitions:
7071
depth: "<<FUNCTION_NAME:calculateDepth,PARAMETER:$.<<pipeline-name>>.source.documentdb.s3_prefix>>"
7172
max_objects_per_ownership: 50
@@ -75,7 +76,7 @@
7576
filter:
7677
include_prefix: ["<<FUNCTION_NAME:getSourceCoordinationIdentifierEnvVariable,PARAMETER:$.<<pipeline-name>>.source.documentdb.s3_prefix>>"]
7778
scheduling:
78-
interval: "60s"
79+
interval: "20s"
7980
processor: "<<$.<<pipeline-name>>.processor>>"
8081
sink: "<<$.<<pipeline-name>>.sink>>"
8182
routes: "<<$.<<pipeline-name>>.routes>>" # In placeholder, routes or route (defined as alias) will be transformed to route in json as route will be primarily picked in pipelineModel.

data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ public List<PartitionIdentifier> apply(final Map<String, Object> globalStateMap)
9090
final Instant updatedScanTime = Instant.now();
9191
if (Objects.nonNull(s3ScanKeyPathOption) && Objects.nonNull(s3ScanKeyPathOption.getS3scanIncludePrefixOptions()))
9292
s3ScanKeyPathOption.getS3scanIncludePrefixOptions().forEach(includePath -> {
93+
LOG.info("Include path: {}", includePath);
9394
listObjectsV2Request.prefix(includePath);
9495
objectsToProcess.addAll(listFilteredS3ObjectsForBucket(excludeItems, listObjectsV2Request,
9596
scanOptions.getBucketOption().getName(), scanOptions.getUseStartDateTime(), scanOptions.getUseEndDateTime(), globalStateMap));

data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,6 @@ public class ScanObjectWorker implements Runnable {
5656

5757
static final Duration NO_OBJECTS_FOUND_BEFORE_PARTITION_DELETION_DURATION = Duration.ofHours(1);
5858
private static final int RETRY_BACKOFF_ON_EXCEPTION_MILLIS = 5_000;
59-
60-
static final Duration ACKNOWLEDGEMENT_SET_TIMEOUT = Duration.ofHours(2);
6159
static final String ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME = "acknowledgementSetCallbackCounter";
6260

6361
static final String NO_OBJECTS_FOUND_FOR_FOLDER_PARTITION = "folderPartitionNoObjectsFound";
@@ -99,6 +97,8 @@ public class ScanObjectWorker implements Runnable {
9997

10098
private final Map<String, AtomicInteger> acknowledgmentsRemainingForPartitions;
10199

100+
private final Duration acknowledgmentSetTimeout;
101+
102102
public ScanObjectWorker(final S3Client s3Client,
103103
final List<ScanOptions> scanOptionsBuilderList,
104104
final S3ObjectHandler s3ObjectHandler,
@@ -127,6 +127,7 @@ public ScanObjectWorker(final S3Client s3Client,
127127
this.sourceCoordinator.initialize();
128128
this.partitionKeys = new ArrayList<>();
129129
this.folderPartitioningOptions = s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions();
130+
this.acknowledgmentSetTimeout = s3SourceConfig.getS3ScanScanOptions().getAcknowledgmentTimeout();
130131

131132
this.partitionCreationSupplier = new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsBuilderList, s3ScanSchedulingOptions, s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions());
132133
this.acknowledgmentsRemainingForPartitions = new ConcurrentHashMap<>();
@@ -214,7 +215,7 @@ private void startProcessingObject(final long waitTimeMillis) {
214215
sourceCoordinator.giveUpPartition(objectToProcess.get().getPartitionKey());
215216
}
216217
partitionKeys.remove(objectToProcess.get().getPartitionKey());
217-
}, ACKNOWLEDGEMENT_SET_TIMEOUT);
218+
}, acknowledgmentSetTimeout);
218219

219220
addProgressCheck(acknowledgementSet, objectToProcess.get());
220221
}
@@ -226,7 +227,7 @@ private void startProcessingObject(final long waitTimeMillis) {
226227
if (endToEndAcknowledgementsEnabled) {
227228
deleteObjectRequest.ifPresent(deleteRequest -> objectsToDeleteForAcknowledgmentSets.put(objectToProcess.get().getPartitionKey(), Set.of(deleteRequest)));
228229
try {
229-
sourceCoordinator.updatePartitionForAcknowledgmentWait(objectToProcess.get().getPartitionKey(), ACKNOWLEDGEMENT_SET_TIMEOUT);
230+
sourceCoordinator.updatePartitionForAcknowledgmentWait(objectToProcess.get().getPartitionKey(), acknowledgmentSetTimeout);
230231
} catch (final PartitionUpdateException e) {
231232
LOG.debug("Failed to update the partition for the acknowledgment wait.");
232233
}
@@ -375,7 +376,7 @@ private void processObjectsForFolderPartition(final List<S3ObjectReference> obje
375376
objectIndex++;
376377
}
377378

378-
sourceCoordinator.updatePartitionForAcknowledgmentWait(folderPartition.getPartitionKey(), ACKNOWLEDGEMENT_SET_TIMEOUT);
379+
sourceCoordinator.updatePartitionForAcknowledgmentWait(folderPartition.getPartitionKey(), acknowledgmentSetTimeout);
379380

380381
if (acknowledgementSet != null) {
381382
acknowledgementSet.complete();
@@ -402,7 +403,7 @@ private AcknowledgementSet createAcknowledgmentSetForFolderPartition(final Sourc
402403
LOG.info("Received all acknowledgments for folder partition {}, giving up this partition", folderPartition.getPartitionKey());
403404
sourceCoordinator.giveUpPartition(folderPartition.getPartitionKey(), Instant.now());
404405
}
405-
}, ACKNOWLEDGEMENT_SET_TIMEOUT);
406+
}, acknowledgmentSetTimeout);
406407
}
407408

408409
private void addProgressCheck(final AcknowledgementSet acknowledgementSet, final SourcePartition<S3SourceProgressState> objectToProcess) {

data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/configuration/S3ScanScanOptions.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
*/
2323
public class S3ScanScanOptions {
2424

25+
@JsonProperty("acknowledgment_timeout")
26+
private Duration acknowledgmentTimeout = Duration.ofHours(2);
27+
2528
@JsonProperty("folder_partitions")
2629
@Valid
2730
private FolderPartitioningOptions folderPartitioningOptions;
@@ -84,4 +87,6 @@ public S3ScanSchedulingOptions getSchedulingOptions() {
8487
}
8588

8689
public FolderPartitioningOptions getPartitioningOptions() { return folderPartitioningOptions; }
90+
91+
public Duration getAcknowledgmentTimeout() { return acknowledgmentTimeout; }
8792
}

data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanObjectWorkerTest.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@
6969
import static org.mockito.Mockito.when;
7070
import static org.opensearch.dataprepper.model.source.s3.S3ScanEnvironmentVariables.STOP_S3_SCAN_PROCESSING_PROPERTY;
7171
import static org.opensearch.dataprepper.plugins.source.s3.ScanObjectWorker.ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME;
72-
import static org.opensearch.dataprepper.plugins.source.s3.ScanObjectWorker.ACKNOWLEDGEMENT_SET_TIMEOUT;
7372
import static org.opensearch.dataprepper.plugins.source.s3.ScanObjectWorker.CHECKPOINT_OWNERSHIP_INTERVAL;
7473
import static org.opensearch.dataprepper.plugins.source.s3.ScanObjectWorker.NO_OBJECTS_FOUND_BEFORE_PARTITION_DELETION_DURATION;
7574
import static org.opensearch.dataprepper.plugins.source.s3.ScanObjectWorker.NO_OBJECTS_FOUND_FOR_FOLDER_PARTITION;
@@ -125,10 +124,14 @@ class S3ScanObjectWorkerTest {
125124

126125
private List<ScanOptions> scanOptionsList;
127126

127+
@Mock
128+
private Duration acknowledgmentSetTimeout;
129+
128130
@BeforeEach
129131
void setup() {
130132
scanOptionsList = new ArrayList<>();
131133
when(s3ScanScanOptions.getPartitioningOptions()).thenReturn(null);
134+
when(s3ScanScanOptions.getAcknowledgmentTimeout()).thenReturn(acknowledgmentSetTimeout);
132135
}
133136

134137
private ScanObjectWorker createObjectUnderTest() {
@@ -232,7 +235,7 @@ void buildDeleteObjectRequest_should_be_invoked_after_processing_when_deleteS3Ob
232235

233236
final InOrder inOrder = inOrder(sourceCoordinator, acknowledgementSet, s3ObjectDeleteWorker);
234237
inOrder.verify(s3ObjectDeleteWorker).buildDeleteObjectRequest(bucket, objectKey);
235-
inOrder.verify(sourceCoordinator).updatePartitionForAcknowledgmentWait(partitionKey, ACKNOWLEDGEMENT_SET_TIMEOUT);
238+
inOrder.verify(sourceCoordinator).updatePartitionForAcknowledgmentWait(partitionKey, acknowledgmentSetTimeout);
236239
inOrder.verify(acknowledgementSet).complete();
237240
inOrder.verify(sourceCoordinator).renewPartitionOwnership(partitionKey);
238241
inOrder.verify(sourceCoordinator).completePartition(partitionKey, true);
@@ -289,7 +292,7 @@ void acknowledgment_progress_check_increments_ownership_error_metric_when_partit
289292

290293
final InOrder inOrder = inOrder(sourceCoordinator, acknowledgementSet, s3ObjectDeleteWorker);
291294
inOrder.verify(s3ObjectDeleteWorker).buildDeleteObjectRequest(bucket, objectKey);
292-
inOrder.verify(sourceCoordinator).updatePartitionForAcknowledgmentWait(partitionKey, ACKNOWLEDGEMENT_SET_TIMEOUT);
295+
inOrder.verify(sourceCoordinator).updatePartitionForAcknowledgmentWait(partitionKey, acknowledgmentSetTimeout);
293296
inOrder.verify(acknowledgementSet).complete();
294297
inOrder.verify(sourceCoordinator).renewPartitionOwnership(partitionKey);
295298
inOrder.verify(sourceCoordinator).completePartition(partitionKey, true);
@@ -536,7 +539,7 @@ void processing_with_folder_partition_processes_objects_in_folder_and_deletes_th
536539
inOrder.verify(s3ObjectDeleteWorker).buildDeleteObjectRequest(bucket, firstObject.key());
537540
inOrder.verify(acknowledgementSet1).complete();
538541
inOrder.verify(s3ObjectDeleteWorker).buildDeleteObjectRequest(bucket, secondObject.key());
539-
inOrder.verify(sourceCoordinator).updatePartitionForAcknowledgmentWait(partitionKey, ACKNOWLEDGEMENT_SET_TIMEOUT);
542+
inOrder.verify(sourceCoordinator).updatePartitionForAcknowledgmentWait(partitionKey, acknowledgmentSetTimeout);
540543
inOrder.verify(acknowledgementSet2).complete();
541544

542545
final Consumer<Boolean> firstAckCallback = ackCallbacks.get(0);
@@ -616,7 +619,7 @@ void processing_with_folder_partition_processes_objects_in_folder_until_max_obje
616619
final InOrder inOrder = inOrder(sourceCoordinator, acknowledgementSet1, s3ObjectDeleteWorker);
617620

618621
inOrder.verify(s3ObjectDeleteWorker).buildDeleteObjectRequest(bucket, firstObject.key());
619-
inOrder.verify(sourceCoordinator).updatePartitionForAcknowledgmentWait(partitionKey, ACKNOWLEDGEMENT_SET_TIMEOUT);
622+
inOrder.verify(sourceCoordinator).updatePartitionForAcknowledgmentWait(partitionKey, acknowledgmentSetTimeout);
620623
inOrder.verify(acknowledgementSet1).complete();
621624

622625
final Consumer<Boolean> ackCallback = consumerArgumentCaptor.getValue();

0 commit comments

Comments
 (0)