Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.opensearch.dataprepper.plugins.source.source_crawler.base;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.PaginationCrawlerWorkerProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;

import java.util.Iterator;

/**
* Interface for Crawler client to support token-based pagination. This interface can
* be implemented by different saas clients.
*/
public interface TokenCrawlerClient<T extends SaasWorkerProgressState> extends CrawlerClient<PaginationCrawlerWorkerProgressState> {

/**
* This will be the main API called by crawler. This method assumes that {@link
* CrawlerSourceConfig} is available as a member to {@link CrawlerClient}, as a result of
* which, other scanning properties will also be available to this method
*
* @return returns an {@link Iterator} of {@link ItemInfo}
*/
Iterator<ItemInfo> listItems(String lastToken);

void executePartition(PaginationCrawlerWorkerProgressState state,
Buffer<Record<Event>> buffer,
AcknowledgementSet acknowledgementSet);
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.opensearch.dataprepper.plugins.source.source_crawler.base;

import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.TokenPaginationCrawlerLeaderProgressState;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;

@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "@class")
Comment thread
dlvenable marked this conversation as resolved.
@JsonSubTypes({
@JsonSubTypes.Type(value = TokenPaginationCrawlerLeaderProgressState.class)
})
public interface TokenLeaderProgressState {

String getLastToken();

void setLastToken(String token);
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package org.opensearch.dataprepper.plugins.source.source_crawler.base;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.LeaderPartition;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.SaasSourcePartition;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.TokenPaginationCrawlerLeaderProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.PaginationCrawlerWorkerProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Named;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler.LeaderScheduler.DEFAULT_EXTEND_LEASE_MINUTES;

@Named
public class TokenPaginationCrawler implements Crawler<PaginationCrawlerWorkerProgressState> {
private static final Logger log = LoggerFactory.getLogger(TokenPaginationCrawler.class);
private static final int batchSize = 50;
private static final String PAGINATION_WORKER_PARTITIONS_CREATED = "paginationWorkerPartitionsCreated";
private static final String INVALID_PAGINATION_ITEMS = "invalidPaginationItems";
private final Timer crawlingTimer;
private final CrawlerClient client;
private final Counter parititionsCreatedCounter;
private final Counter invalidPaginationItemsCounter;

public TokenPaginationCrawler(CrawlerClient client,
PluginMetrics pluginMetrics) {
this.client = client;
this.crawlingTimer = pluginMetrics.timer("crawlingTime");
this.parititionsCreatedCounter = pluginMetrics.counter(PAGINATION_WORKER_PARTITIONS_CREATED);
this.invalidPaginationItemsCounter = pluginMetrics.counter(INVALID_PAGINATION_ITEMS);

}

@Override
public Instant crawl(LeaderPartition leaderPartition,
EnhancedSourceCoordinator coordinator) {
long startTime = System.currentTimeMillis();
Instant lastLeaderSavedInstant = Instant.now();
TokenPaginationCrawlerLeaderProgressState leaderProgressState = (TokenPaginationCrawlerLeaderProgressState) leaderPartition
.getProgressState().get();
String lastToken = leaderProgressState.getLastToken();
Iterator<ItemInfo> itemInfoIterator = ((TokenCrawlerClient) client).listItems(lastToken);
String latestToken = lastToken;
log.info("Starting to crawl the source with last item ID: {}", lastToken);
do {
final List<ItemInfo> itemInfoList = new ArrayList<>();
for (int i = 0; i < batchSize && itemInfoIterator.hasNext(); i++) {
final ItemInfo nextItem = itemInfoIterator.next();
if (nextItem == null) {
//we don't expect null items, but just in case, we'll skip them
log.warn("Unexpected encounter of a null item while processing batch with last item ID " + lastToken);
invalidPaginationItemsCounter.increment();
continue;
}
itemInfoList.add(nextItem);
if (nextItem.getItemId() != null) {
latestToken = nextItem.getItemId();
}
}
if (!itemInfoList.isEmpty()) {
createPartition(itemInfoList, coordinator);
}
// Check point leader progress state at every minute interval.
Instant currentTimeInstance = Instant.now();
if (Duration.between(lastLeaderSavedInstant, currentTimeInstance).toMinutes() >= 1) {
// intermediate updates to master partition state
updateLeaderProgressState(leaderPartition, latestToken, coordinator);
lastLeaderSavedInstant = currentTimeInstance;
}
} while (itemInfoIterator.hasNext());
updateLeaderProgressState(leaderPartition, latestToken, coordinator);
long crawlTimeMillis = System.currentTimeMillis() - startTime;
log.debug("Crawling completed in {} ms", crawlTimeMillis);
crawlingTimer.record(crawlTimeMillis, TimeUnit.MILLISECONDS);
return Instant.now(); // Return current time as required by Crawler interface
}

public void executePartition(PaginationCrawlerWorkerProgressState state, Buffer<Record<Event>> buffer, AcknowledgementSet acknowledgementSet) {
client.executePartition(state, buffer, acknowledgementSet);
}

private void updateLeaderProgressState(LeaderPartition leaderPartition, String updatedToken, EnhancedSourceCoordinator coordinator) {
TokenPaginationCrawlerLeaderProgressState leaderProgressState = (TokenPaginationCrawlerLeaderProgressState) leaderPartition
.getProgressState().get();
String oldToken = leaderProgressState.getLastToken();
leaderProgressState.setLastToken(updatedToken);
leaderPartition.setLeaderProgressState(leaderProgressState);
coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_MINUTES);
log.info("Updated leader progress state: old lastToken={}, new lastToken={}", oldToken, updatedToken);
}

private void createPartition(List<ItemInfo> itemInfoList, EnhancedSourceCoordinator coordinator) {
if (itemInfoList.isEmpty()) {
return;
}
ItemInfo itemInfo = itemInfoList.get(0);
String partitionKey = itemInfo.getPartitionKey();
List<String> itemIds = itemInfoList.stream().map(ItemInfo::getId).collect(Collectors.toList());
PaginationCrawlerWorkerProgressState state = new PaginationCrawlerWorkerProgressState();
state.setKeyAttributes(itemInfo.getKeyAttributes());
state.setItemIds(itemIds);
state.setExportStartTime(Instant.now());
state.setLoadedItems(itemInfoList.size());
SaasSourcePartition sourcePartition = new SaasSourcePartition(state, partitionKey);
coordinator.createPartition(sourcePartition);
parititionsCreatedCounter.increment();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition;

import org.opensearch.dataprepper.plugins.source.source_crawler.base.TokenLeaderProgressState;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -56,7 +57,22 @@ public Optional<LeaderProgressState> getProgressState() {
}

public void setLeaderProgressState(LeaderProgressState state) {
this.state.setLastPollTime(state.getLastPollTime());
boolean stateIsToken = state instanceof TokenLeaderProgressState;
boolean thisStateIsToken = this.state instanceof TokenLeaderProgressState;

if (stateIsToken != thisStateIsToken) {
// Validate that the states are not inconsistent
// We don't expect to reach here.
throw new RuntimeException("Leader partition progress state type mismatch: " +
"Provided state type: " + state.getClass().getSimpleName() +
Comment thread
dlvenable marked this conversation as resolved.
", Current state type: " + this.state.getClass().getSimpleName());
}

if (state instanceof TokenLeaderProgressState) {
((TokenLeaderProgressState) this.state).setLastToken(((TokenLeaderProgressState) state).getLastToken());
} else {
this.state.setLastPollTime(state.getLastPollTime());
}
}

/**
Expand All @@ -77,4 +93,4 @@ public LeaderProgressState convertToPartitionState(final String serializedPartit
return null;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state;

import org.opensearch.dataprepper.plugins.source.source_crawler.base.TokenLeaderProgressState;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.LeaderProgressState;

import java.time.Instant;

@Data
@NoArgsConstructor
public class TokenPaginationCrawlerLeaderProgressState implements TokenLeaderProgressState, LeaderProgressState {

@JsonProperty("initialized")
private boolean initialized = false;

@JsonProperty("last_token")
private String lastToken;

private Instant lastPollTime;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is also needed here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because TokenPaginationCrawlerLeaderProgressState needs to implement LeaderProgressState which has required abstract method setLastPollTime for compatibility with existing PaginationCrawler.


public TokenPaginationCrawlerLeaderProgressState(@JsonProperty("last_token") final String lastToken) {
this.lastToken = lastToken;
}
}

Loading
Loading