Skip to content

Commit e08c016

Browse files
chrisale000simonelbaz
authored andcommitted
feat: Add configurable lease interval for crawler source (opensearch-project#6432)
This change adds support for configurable lease interval in the crawler source plugin, allowing users to customize the leader scheduler's lease interval instead of using a hardcoded value. Changes: - Added getLeaseInterval() method to CrawlerSourceConfig interface with default value of 1 minute - Modified CrawlerSourcePlugin to use the configurable lease interval from the source configuration Signed-off-by: Alexander Christensen <alchrisk@amazon.com> Signed-off-by: Simon ELBAZ <elbazsimon9@gmail.com>
1 parent 48813a8 commit e08c016

2 files changed

Lines changed: 12 additions & 1 deletion

File tree

data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourceConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,14 @@ default Duration getDurationToGiveUpRetry() {
4545
default Duration getDurationToDelayRetry() {
4646
return DEFAULT_MAX_DURATION_TO_DELAY_RETRY;
4747
}
48+
49+
/**
50+
* Gets the lease interval for the leader scheduler.
51+
* Defaults to 1 minute if not overridden.
52+
*
53+
* @return Duration for lease interval
54+
*/
55+
default Duration getLeaseInterval() {
56+
return Duration.ofMinutes(1);
57+
}
4858
}

data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePlugin.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ public void start(Buffer<Record<Event>> buffer) {
7070
boolean isPartitionCreated = coordinator.createPartition(leaderPartition);
7171
log.debug("Leader partition creation status: {}", isPartitionCreated);
7272

73-
Runnable leaderScheduler = new LeaderScheduler(coordinator, crawler);
73+
LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, crawler);
74+
leaderScheduler.setLeaseInterval(sourceConfig.getLeaseInterval());
7475
this.executorService.submit(leaderScheduler);
7576
//Register worker threaders
7677
for (int i = 0; i < sourceConfig.getNumberOfWorkers(); i++) {

0 commit comments

Comments
 (0)