Skip to content

Commit 088b2b3

Browse files
graytaylor0wandna-amazon
authored andcommitted
Increase acknowledgment set timeout for opensearch source (opensearch-project#6291)
Signed-off-by: Taylor Gray <tylgry@amazon.com> Signed-off-by: Nathan Wand <wandna@amazon.com>
1 parent 221c6c9 commit 088b2b3

2 files changed

Lines changed: 8 additions & 3 deletions

File tree

  • data-prepper-plugins/opensearch/src

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/WorkerCommonUtils.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,11 @@ public class WorkerCommonUtils {
2828
static final Duration BACKOFF_ON_EXCEPTION = Duration.ofSeconds(60);
2929

3030
static final long DEFAULT_CHECKPOINT_INTERVAL_MILLS = 5 * 60_000;
31-
static final Duration ACKNOWLEDGEMENT_SET_TIMEOUT = Duration.ofMinutes(20);
31+
32+
// Set acknowledgment timeout to very high value to handle large indexes.
33+
// In case of failure the retries will be handled by source coordination
34+
static final Duration ACKNOWLEDGEMENT_SET_TIMEOUT = Duration.ofHours(1000);
35+
static final Duration OWNERSHIP_TIMEOUT = Duration.ofMinutes(30);
3236
static final Duration STARTING_BACKOFF = Duration.ofMillis(500);
3337
static final Duration MAX_BACKOFF = Duration.ofSeconds(60);
3438
static final int BACKOFF_RATE = 2;
@@ -64,7 +68,7 @@ static void completeIndexPartition(final OpenSearchSourceConfiguration openSearc
6468
final SourcePartition<OpenSearchIndexProgressState> indexPartition,
6569
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator) {
6670
if (openSearchSourceConfiguration.isAcknowledgmentsEnabled()) {
67-
sourceCoordinator.updatePartitionForAcknowledgmentWait(indexPartition.getPartitionKey(), ACKNOWLEDGEMENT_SET_TIMEOUT);
71+
sourceCoordinator.updatePartitionForAcknowledgmentWait(indexPartition.getPartitionKey(), OWNERSHIP_TIMEOUT);
6872
acknowledgementSet.complete();
6973
} else {
7074
sourceCoordinator.closePartition(

data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.PitWorker.EXTEND_KEEP_ALIVE_TIME;
7070
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.PitWorker.STARTING_KEEP_ALIVE;
7171
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.ACKNOWLEDGEMENT_SET_TIMEOUT;
72+
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.OWNERSHIP_TIMEOUT;
7273

7374
@ExtendWith(MockitoExtension.class)
7475
public class PitWorkerTest {
@@ -316,7 +317,7 @@ void run_with_acknowledgments_enabled_creates_and_deletes_pit_and_closes_that_pa
316317
when(schedulingParameterConfiguration.getInterval()).thenReturn(Duration.ZERO);
317318
when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()).thenReturn(schedulingParameterConfiguration);
318319

319-
doNothing().when(sourceCoordinator).updatePartitionForAcknowledgmentWait(partitionKey, ACKNOWLEDGEMENT_SET_TIMEOUT);
320+
doNothing().when(sourceCoordinator).updatePartitionForAcknowledgmentWait(partitionKey, OWNERSHIP_TIMEOUT);
320321
doNothing().when(sourceCoordinator).closePartition(partitionKey,
321322
Duration.ZERO, 1, true);
322323

0 commit comments

Comments
 (0)