diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/TokenCrawlerClient.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/TokenCrawlerClient.java new file mode 100644 index 0000000000..31ac6efe40 --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/TokenCrawlerClient.java @@ -0,0 +1,31 @@ +package org.opensearch.dataprepper.plugins.source.source_crawler.base; + +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.PaginationCrawlerWorkerProgressState; +import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo; + +import java.util.Iterator; + +/** + * Interface for Crawler client to support token-based pagination. This interface can + * be implemented by different saas clients. + */ +public interface TokenCrawlerClient extends CrawlerClient { + + /** + * This will be the main API called by crawler. This method assumes that {@link + * CrawlerSourceConfig} is available as a member to {@link CrawlerClient}, as a result of + * which, other scanning properties will also be available to this method + * + * @return returns an {@link Iterator} of {@link ItemInfo} + */ + Iterator listItems(String lastToken); + + void executePartition(PaginationCrawlerWorkerProgressState state, + Buffer> buffer, + AcknowledgementSet acknowledgementSet); +} + diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/TokenLeaderProgressState.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/TokenLeaderProgressState.java new file mode 100644 index 0000000000..45a49c6e44 --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/TokenLeaderProgressState.java @@ -0,0 +1,17 @@ +package org.opensearch.dataprepper.plugins.source.source_crawler.base; + +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.TokenPaginationCrawlerLeaderProgressState; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "@class") +@JsonSubTypes({ + @JsonSubTypes.Type(value = TokenPaginationCrawlerLeaderProgressState.class) +}) +public interface TokenLeaderProgressState { + + String getLastToken(); + + void setLastToken(String token); +} + diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/TokenPaginationCrawler.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/TokenPaginationCrawler.java new file mode 100644 index 0000000000..17bef9afe1 --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/TokenPaginationCrawler.java @@ -0,0 +1,124 @@ +package org.opensearch.dataprepper.plugins.source.source_crawler.base; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Timer; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.LeaderPartition; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.SaasSourcePartition; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.TokenPaginationCrawlerLeaderProgressState; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.PaginationCrawlerWorkerProgressState; +import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Named; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler.LeaderScheduler.DEFAULT_EXTEND_LEASE_MINUTES; + +@Named +public class TokenPaginationCrawler implements Crawler { + private static final Logger log = LoggerFactory.getLogger(TokenPaginationCrawler.class); + private static final int batchSize = 50; + private static final String PAGINATION_WORKER_PARTITIONS_CREATED = "paginationWorkerPartitionsCreated"; + private static final String INVALID_PAGINATION_ITEMS = "invalidPaginationItems"; + private final Timer crawlingTimer; + private final CrawlerClient client; + private final Counter parititionsCreatedCounter; + private final Counter invalidPaginationItemsCounter; + + public TokenPaginationCrawler(CrawlerClient client, + PluginMetrics pluginMetrics) { + this.client = client; + this.crawlingTimer = pluginMetrics.timer("crawlingTime"); + this.parititionsCreatedCounter = pluginMetrics.counter(PAGINATION_WORKER_PARTITIONS_CREATED); + this.invalidPaginationItemsCounter = pluginMetrics.counter(INVALID_PAGINATION_ITEMS); + + } + + @Override + public Instant crawl(LeaderPartition leaderPartition, + EnhancedSourceCoordinator coordinator) { + long startTime = System.currentTimeMillis(); + Instant lastLeaderSavedInstant = Instant.now(); + TokenPaginationCrawlerLeaderProgressState leaderProgressState = (TokenPaginationCrawlerLeaderProgressState) leaderPartition + .getProgressState().get(); + String lastToken = leaderProgressState.getLastToken(); + Iterator itemInfoIterator = ((TokenCrawlerClient) client).listItems(lastToken); + String latestToken = lastToken; + log.info("Starting to crawl the source with last item ID: {}", lastToken); + do { + final List itemInfoList = new ArrayList<>(); + for (int i = 0; i < batchSize && itemInfoIterator.hasNext(); i++) { + final ItemInfo nextItem = itemInfoIterator.next(); + if (nextItem == null) { + //we don't expect null items, but just in case, we'll skip them + log.warn("Unexpected encounter of a null item while processing batch with last item ID " + lastToken); + invalidPaginationItemsCounter.increment(); + continue; + } + itemInfoList.add(nextItem); + if (nextItem.getItemId() != null) { + latestToken = nextItem.getItemId(); + } + } + if (!itemInfoList.isEmpty()) { + createPartition(itemInfoList, coordinator); + } + // Check point leader progress state at every minute interval. + Instant currentTimeInstance = Instant.now(); + if (Duration.between(lastLeaderSavedInstant, currentTimeInstance).toMinutes() >= 1) { + // intermediate updates to master partition state + updateLeaderProgressState(leaderPartition, latestToken, coordinator); + lastLeaderSavedInstant = currentTimeInstance; + } + } while (itemInfoIterator.hasNext()); + updateLeaderProgressState(leaderPartition, latestToken, coordinator); + long crawlTimeMillis = System.currentTimeMillis() - startTime; + log.debug("Crawling completed in {} ms", crawlTimeMillis); + crawlingTimer.record(crawlTimeMillis, TimeUnit.MILLISECONDS); + return Instant.now(); // Return current time as required by Crawler interface + } + + public void executePartition(PaginationCrawlerWorkerProgressState state, Buffer> buffer, AcknowledgementSet acknowledgementSet) { + client.executePartition(state, buffer, acknowledgementSet); + } + + private void updateLeaderProgressState(LeaderPartition leaderPartition, String updatedToken, EnhancedSourceCoordinator coordinator) { + TokenPaginationCrawlerLeaderProgressState leaderProgressState = (TokenPaginationCrawlerLeaderProgressState) leaderPartition + .getProgressState().get(); + String oldToken = leaderProgressState.getLastToken(); + leaderProgressState.setLastToken(updatedToken); + leaderPartition.setLeaderProgressState(leaderProgressState); + coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_MINUTES); + log.info("Updated leader progress state: old lastToken={}, new lastToken={}", oldToken, updatedToken); + } + + private void createPartition(List itemInfoList, EnhancedSourceCoordinator coordinator) { + if (itemInfoList.isEmpty()) { + return; + } + ItemInfo itemInfo = itemInfoList.get(0); + String partitionKey = itemInfo.getPartitionKey(); + List itemIds = itemInfoList.stream().map(ItemInfo::getId).collect(Collectors.toList()); + PaginationCrawlerWorkerProgressState state = new PaginationCrawlerWorkerProgressState(); + state.setKeyAttributes(itemInfo.getKeyAttributes()); + state.setItemIds(itemIds); + state.setExportStartTime(Instant.now()); + state.setLoadedItems(itemInfoList.size()); + SaasSourcePartition sourcePartition = new SaasSourcePartition(state, partitionKey); + coordinator.createPartition(sourcePartition); + parititionsCreatedCounter.increment(); + } +} diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/partition/LeaderPartition.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/partition/LeaderPartition.java index eff777d35a..b7e42dad43 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/partition/LeaderPartition.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/partition/LeaderPartition.java @@ -1,5 +1,6 @@ package org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition; +import org.opensearch.dataprepper.plugins.source.source_crawler.base.TokenLeaderProgressState; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -56,7 +57,22 @@ public Optional getProgressState() { } public void setLeaderProgressState(LeaderProgressState state) { - this.state.setLastPollTime(state.getLastPollTime()); + boolean stateIsToken = state instanceof TokenLeaderProgressState; + boolean thisStateIsToken = this.state instanceof TokenLeaderProgressState; + + if (stateIsToken != thisStateIsToken) { + // Validate that the states are not inconsistent + // We don't expect to reach here. + throw new RuntimeException("Leader partition progress state type mismatch: " + + "Provided state type: " + state.getClass().getSimpleName() + + ", Current state type: " + this.state.getClass().getSimpleName()); + } + + if (state instanceof TokenLeaderProgressState) { + ((TokenLeaderProgressState) this.state).setLastToken(((TokenLeaderProgressState) state).getLastToken()); + } else { + this.state.setLastPollTime(state.getLastPollTime()); + } } /** @@ -77,4 +93,4 @@ public LeaderProgressState convertToPartitionState(final String serializedPartit return null; } } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/TokenPaginationCrawlerLeaderProgressState.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/TokenPaginationCrawlerLeaderProgressState.java new file mode 100644 index 0000000000..b61e721071 --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/TokenPaginationCrawlerLeaderProgressState.java @@ -0,0 +1,27 @@ +package org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state; + +import org.opensearch.dataprepper.plugins.source.source_crawler.base.TokenLeaderProgressState; +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.opensearch.dataprepper.plugins.source.source_crawler.base.LeaderProgressState; + +import java.time.Instant; + +@Data +@NoArgsConstructor +public class TokenPaginationCrawlerLeaderProgressState implements TokenLeaderProgressState, LeaderProgressState { + + @JsonProperty("initialized") + private boolean initialized = false; + + @JsonProperty("last_token") + private String lastToken; + + private Instant lastPollTime; + + public TokenPaginationCrawlerLeaderProgressState(@JsonProperty("last_token") final String lastToken) { + this.lastToken = lastToken; + } +} + diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/TokenPaginationCrawlerTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/TokenPaginationCrawlerTest.java new file mode 100644 index 0000000000..9bc2daee78 --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/TokenPaginationCrawlerTest.java @@ -0,0 +1,165 @@ +package org.opensearch.dataprepper.plugins.source.source_crawler.base; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.LeaderPartition; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.SaasSourcePartition; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.PaginationCrawlerWorkerProgressState; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.TokenPaginationCrawlerLeaderProgressState; +import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo; +import org.opensearch.dataprepper.plugins.source.source_crawler.model.TestItemInfo; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.internal.verification.VerificationModeFactory.times; + +@ExtendWith(MockitoExtension.class) +public class TokenPaginationCrawlerTest { + private static final int DEFAULT_BATCH_SIZE = 50; + private static final String INITIAL_TOKEN = "initial-token"; + @Mock + private AcknowledgementSet acknowledgementSet; + @Mock + private EnhancedSourceCoordinator coordinator; + @Mock + private Buffer> buffer; + @Mock + private TokenCrawlerClient client; + @Mock + private PaginationCrawlerWorkerProgressState state; + @Mock + private LeaderPartition leaderPartition; + private Crawler crawler; + private final PluginMetrics pluginMetrics = PluginMetrics.fromNames("CrawlerTest", "crawler"); + + @BeforeEach + public void setup() { + crawler = new TokenPaginationCrawler(client, pluginMetrics); + when(leaderPartition.getProgressState()).thenReturn(Optional.of(new TokenPaginationCrawlerLeaderProgressState(INITIAL_TOKEN))); + } + + @Test + public void crawlerConstructionTest() { + reset(leaderPartition); + assertNotNull(crawler); + } + + @Test + public void executePartitionTest() { + reset(leaderPartition); + crawler.executePartition(state, buffer, acknowledgementSet); + verify(client).executePartition(state, buffer, acknowledgementSet); + } + + @Test + void testCrawlWithEmptyList() { + String lastToken = INITIAL_TOKEN; + when(client.listItems(lastToken)).thenReturn(Collections.emptyIterator()); + when(leaderPartition.getProgressState()).thenReturn(Optional.of(new TokenPaginationCrawlerLeaderProgressState(lastToken))); + crawler.crawl(leaderPartition, coordinator); + verify(coordinator, never()).createPartition(any(SaasSourcePartition.class)); + } + + @Test + void testCrawlWithNonEmptyList() { + List itemInfoList = new ArrayList<>(); + for (int i = 0; i < DEFAULT_BATCH_SIZE; i++) { + itemInfoList.add(new TestItemInfo("itemId" + i)); + } + when(client.listItems(INITIAL_TOKEN)).thenReturn(itemInfoList.iterator()); + crawler.crawl(leaderPartition, coordinator); + verify(coordinator, times(1)).createPartition(any(SaasSourcePartition.class)); + } + + @Test + void testCrawlWithMultiplePartitions() { + List itemInfoList = new ArrayList<>(); + for (int i = 0; i < DEFAULT_BATCH_SIZE + 1; i++) { + itemInfoList.add(new TestItemInfo("testId" + i)); + } + when(client.listItems(INITIAL_TOKEN)).thenReturn(itemInfoList.iterator()); + crawler.crawl(leaderPartition, coordinator); + verify(coordinator, times(2)).createPartition(any(SaasSourcePartition.class)); + } + + @Test + void testBatchSize() { + List itemInfoList = new ArrayList<>(); + int maxItemsPerPage = DEFAULT_BATCH_SIZE; + for (int i = 0; i < maxItemsPerPage; i++) { + itemInfoList.add(new TestItemInfo("testId" + i)); + } + when(client.listItems(INITIAL_TOKEN)).thenReturn(itemInfoList.iterator()); + crawler.crawl(leaderPartition, coordinator); + int expectedNumberOfInvocations = 1; + verify(coordinator, times(expectedNumberOfInvocations)).createPartition(any(SaasSourcePartition.class)); + + List itemInfoList2 = new ArrayList<>(); + for (int i = 0; i < maxItemsPerPage * 2; i++) { + itemInfoList2.add(new TestItemInfo("testId" + i)); + } + when(client.listItems(anyString())).thenReturn(itemInfoList2.iterator()); + crawler.crawl(leaderPartition, coordinator); + expectedNumberOfInvocations += 2; + verify(coordinator, times(expectedNumberOfInvocations)).createPartition(any(SaasSourcePartition.class)); + } + + @Test + void testCrawlWithNullItemsInList() { + List itemInfoList = new ArrayList<>(); + itemInfoList.add(null); + for (int i = 0; i < DEFAULT_BATCH_SIZE - 1; i++) { + itemInfoList.add(new TestItemInfo("testId" + i)); + } + when(client.listItems(INITIAL_TOKEN)).thenReturn(itemInfoList.iterator()); + crawler.crawl(leaderPartition, coordinator); + verify(coordinator, times(1)).createPartition(any(SaasSourcePartition.class)); + } + + @Test + void testUpdatingTokenNullMetaData() { + List itemInfoList = new ArrayList<>(); + ItemInfo testItem = createTestItemInfo("1"); + itemInfoList.add(testItem); + when(client.listItems(INITIAL_TOKEN)).thenReturn(itemInfoList.iterator()); + crawler.crawl(leaderPartition, coordinator); + assertEquals("1", ((TokenLeaderProgressState) leaderPartition.getProgressState().get()).getLastToken()); + } + + @Test + void testUpdatedTokenCreatedNewer() { + String lastToken = INITIAL_TOKEN; + List itemInfoList = new ArrayList<>(); + ItemInfo testItem = createTestItemInfo("1"); + itemInfoList.add(testItem); + when(client.listItems(lastToken)).thenReturn(itemInfoList.iterator()); + crawler.crawl(leaderPartition, coordinator); + assertEquals("1", ((TokenLeaderProgressState) leaderPartition.getProgressState().get()).getLastToken()); + } + + private ItemInfo createTestItemInfo(String id) { + return new TestItemInfo(id, new HashMap<>(), Instant.now()); + } +} diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java index d97b1a19ba..d37918d418 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java @@ -10,6 +10,7 @@ import org.opensearch.dataprepper.plugins.source.source_crawler.base.Crawler; import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.LeaderPartition; import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.PaginationCrawlerLeaderProgressState; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.TokenPaginationCrawlerLeaderProgressState; import java.time.Duration; import java.time.Instant; @@ -26,10 +27,13 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; +import static org.junit.jupiter.api.Assertions.assertThrows; @ExtendWith(MockitoExtension.class) public class LeaderSchedulerTest { + private static String LAST_TOKEN = "sample-token-123"; + @Mock private EnhancedSourceCoordinator coordinator; @Mock @@ -47,6 +51,29 @@ void testUnableToAcquireLeaderPartition() throws InterruptedException { verifyNoInteractions(crawler); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testTokenPaginationCrawlerLeaderPartitionsCreation(boolean initializationState) throws InterruptedException { + LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, crawler); + LeaderPartition leaderPartition = new LeaderPartition(new TokenPaginationCrawlerLeaderProgressState("")); + TokenPaginationCrawlerLeaderProgressState state = (TokenPaginationCrawlerLeaderProgressState) leaderPartition.getProgressState().get(); + state.setInitialized(initializationState); + state.setLastToken(LAST_TOKEN); + given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.of(leaderPartition)); + doThrow(RuntimeException.class).when(coordinator).saveProgressStateForPartition(any(LeaderPartition.class), any(Duration.class)); + + ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(leaderScheduler); + + Thread.sleep(100); + executorService.shutdownNow(); + + // Check if crawler was invoked and updated leader lease renewal time + verify(crawler, times(1)).crawl(leaderPartition, coordinator); + verify(coordinator, times(1)).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class)); + + } + @ParameterizedTest @ValueSource(booleans = {true, false}) void testPaginationCrawlerLeaderPartitionsCreation(boolean initializationState) throws InterruptedException { @@ -90,6 +117,30 @@ void testExceptionWhileAcquiringLeaderPartition(boolean initializationState) thr verifyNoInteractions(crawler); } + @Test + void testTokenPaginationCrawlerWhileLoopRunnningAfterTheSleep() throws InterruptedException { + LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, crawler); + leaderScheduler.setLeaseInterval(Duration.ofMillis(10)); + LeaderPartition leaderPartition = new LeaderPartition(new TokenPaginationCrawlerLeaderProgressState("")); + TokenPaginationCrawlerLeaderProgressState state = (TokenPaginationCrawlerLeaderProgressState) leaderPartition.getProgressState().get(); + state.setInitialized(false); + state.setLastToken(LAST_TOKEN); + when(crawler.crawl(any(LeaderPartition.class), any(EnhancedSourceCoordinator.class))).thenReturn(Instant.ofEpochMilli(10)); + when(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)) + .thenReturn(Optional.of(leaderPartition)) + .thenThrow(RuntimeException.class); + + ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(leaderScheduler); + + //Wait for more than a minute as the default while loop wait time in leader scheduler is 1 minute + Thread.sleep(100); + executorService.shutdownNow(); + + // Check if crawler was invoked and updated leader lease renewal time + verify(crawler, atLeast(2)).crawl(any(LeaderPartition.class), any(EnhancedSourceCoordinator.class)); + } + @Test void testWhileLoopRunnningAfterTheSleep() throws InterruptedException { LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, crawler); @@ -113,4 +164,18 @@ void testWhileLoopRunnningAfterTheSleep() throws InterruptedException { // Check if crawler was invoked and updated leader lease renewal time verify(crawler, atLeast(2)).crawl(any(LeaderPartition.class), any(EnhancedSourceCoordinator.class)); } + + @Test + void testSetLeaderProgressState_throwsExceptionOnStateMismatch() { + // Create a LeaderPartition with TokenPaginationCrawlerLeaderProgressState + LeaderPartition leaderPartition = new LeaderPartition(new TokenPaginationCrawlerLeaderProgressState("")); + + // Try to set a different type of state (PaginationCrawlerLeaderProgressState) + PaginationCrawlerLeaderProgressState incompatibleState = new PaginationCrawlerLeaderProgressState(Instant.now()); + + // Verify that RuntimeException is thrown due to state type mismatch + RuntimeException exception = assertThrows(RuntimeException.class, () -> { + leaderPartition.setLeaderProgressState(incompatibleState); + }); + } } diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/TokenPaginationCrawlerLeaderProgressStateTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/TokenPaginationCrawlerLeaderProgressStateTest.java new file mode 100644 index 0000000000..8cf2eddfb4 --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/state/TokenPaginationCrawlerLeaderProgressStateTest.java @@ -0,0 +1,43 @@ +package org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state; + +import java.time.Instant; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + + +public class TokenPaginationCrawlerLeaderProgressStateTest { + + @Test + void testDeserializeTokenPaginationCrawlerLeaderProgressState_withTypeInfo() throws JsonProcessingException { + String json = "{\n" + + " \"@class\": \"org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.TokenPaginationCrawlerLeaderProgressState\",\n" + + " \"last_token\": \"sample-token-123\",\n" + + " \"initialized\": true\n" + + "}"; + + ObjectMapper objectMapper = new ObjectMapper() + .registerModule(new JavaTimeModule()) + .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + + TokenPaginationCrawlerLeaderProgressState state = objectMapper.readValue(json, TokenPaginationCrawlerLeaderProgressState.class); + assertEquals("sample-token-123", state.getLastToken()); + assertEquals(true, state.isInitialized()); + } + + @Test + void testConstructor_setsLastTokenCorrectly() { + String testToken = "sample-token-123"; + TokenPaginationCrawlerLeaderProgressState state = new TokenPaginationCrawlerLeaderProgressState(testToken); + state.setLastPollTime(Instant.now()); + + assertEquals(testToken, state.getLastToken()); + assertNotNull(state.getLastPollTime()); + } + +}