From 60fe1b40a05156e17c4868e23148fce536f870ab Mon Sep 17 00:00:00 2001 From: Alexander Christensen Date: Mon, 26 Jan 2026 14:01:48 -0800 Subject: [PATCH] feat: Add configurable lease interval for crawler source This change adds support for configurable lease interval in the crawler source plugin, allowing users to customize the leader scheduler's lease interval instead of using a hardcoded value. Changes: - Added getLeaseInterval() method to CrawlerSourceConfig interface with default value of 1 minute - Modified CrawlerSourcePlugin to use the configurable lease interval from the source configuration Signed-off-by: Alexander Christensen --- .../source_crawler/base/CrawlerSourceConfig.java | 10 ++++++++++ .../source_crawler/base/CrawlerSourcePlugin.java | 3 ++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourceConfig.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourceConfig.java index 3342ac403d..efee2cac3e 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourceConfig.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourceConfig.java @@ -45,4 +45,14 @@ default Duration getDurationToGiveUpRetry() { default Duration getDurationToDelayRetry() { return DEFAULT_MAX_DURATION_TO_DELAY_RETRY; } + + /** + * Gets the lease interval for the leader scheduler. + * Defaults to 1 minute if not overridden. + * + * @return Duration for lease interval + */ + default Duration getLeaseInterval() { + return Duration.ofMinutes(1); + } } diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePlugin.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePlugin.java index 7b758e9d6e..52021be6d3 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePlugin.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePlugin.java @@ -70,7 +70,8 @@ public void start(Buffer> buffer) { boolean isPartitionCreated = coordinator.createPartition(leaderPartition); log.debug("Leader partition creation status: {}", isPartitionCreated); - Runnable leaderScheduler = new LeaderScheduler(coordinator, crawler); + LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, crawler); + leaderScheduler.setLeaseInterval(sourceConfig.getLeaseInterval()); this.executorService.submit(leaderScheduler); //Register worker threaders for (int i = 0; i < sourceConfig.getNumberOfWorkers(); i++) {