diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourceConfig.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourceConfig.java index 3342ac403d..efee2cac3e 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourceConfig.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourceConfig.java @@ -45,4 +45,14 @@ default Duration getDurationToGiveUpRetry() { default Duration getDurationToDelayRetry() { return DEFAULT_MAX_DURATION_TO_DELAY_RETRY; } + + /** + * Gets the lease interval for the leader scheduler. + * Defaults to 1 minute if not overridden. + * + * @return Duration for lease interval + */ + default Duration getLeaseInterval() { + return Duration.ofMinutes(1); + } } diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePlugin.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePlugin.java index 7b758e9d6e..52021be6d3 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePlugin.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePlugin.java @@ -70,7 +70,8 @@ public void start(Buffer> buffer) { boolean isPartitionCreated = coordinator.createPartition(leaderPartition); log.debug("Leader partition creation status: {}", isPartitionCreated); - Runnable leaderScheduler = new LeaderScheduler(coordinator, crawler); + LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, crawler); + leaderScheduler.setLeaseInterval(sourceConfig.getLeaseInterval()); this.executorService.submit(leaderScheduler); //Register worker threaders for (int i = 0; i < sourceConfig.getNumberOfWorkers(); i++) {