Skip to content

Commit 8e37ee5

Browse files
bbenner7635Brendan Benner
andauthored
Add TokenPaginationCrawler for SAAS plugins (#6008)
Add TokenPaginationCrawler for SAAS plugins Signed-off-by: Brendan Benner <bbenner@amazon.com> Co-authored-by: Brendan Benner <bbenner@amazon.com>
1 parent decca13 commit 8e37ee5

8 files changed

Lines changed: 490 additions & 2 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package org.opensearch.dataprepper.plugins.source.source_crawler.base;
2+
3+
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
4+
import org.opensearch.dataprepper.model.buffer.Buffer;
5+
import org.opensearch.dataprepper.model.event.Event;
6+
import org.opensearch.dataprepper.model.record.Record;
7+
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.PaginationCrawlerWorkerProgressState;
8+
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
9+
10+
import java.util.Iterator;
11+
12+
/**
13+
* Interface for Crawler client to support token-based pagination. This interface can
14+
* be implemented by different saas clients.
15+
*/
16+
public interface TokenCrawlerClient<T extends SaasWorkerProgressState> extends CrawlerClient<PaginationCrawlerWorkerProgressState> {
17+
18+
/**
19+
* This will be the main API called by crawler. This method assumes that {@link
20+
* CrawlerSourceConfig} is available as a member to {@link CrawlerClient}, as a result of
21+
* which, other scanning properties will also be available to this method
22+
*
23+
* @return returns an {@link Iterator} of {@link ItemInfo}
24+
*/
25+
Iterator<ItemInfo> listItems(String lastToken);
26+
27+
void executePartition(PaginationCrawlerWorkerProgressState state,
28+
Buffer<Record<Event>> buffer,
29+
AcknowledgementSet acknowledgementSet);
30+
}
31+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package org.opensearch.dataprepper.plugins.source.source_crawler.base;
2+
3+
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.TokenPaginationCrawlerLeaderProgressState;
4+
import com.fasterxml.jackson.annotation.JsonSubTypes;
5+
import com.fasterxml.jackson.annotation.JsonTypeInfo;
6+
7+
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "@class")
8+
@JsonSubTypes({
9+
@JsonSubTypes.Type(value = TokenPaginationCrawlerLeaderProgressState.class)
10+
})
11+
public interface TokenLeaderProgressState {
12+
13+
String getLastToken();
14+
15+
void setLastToken(String token);
16+
}
17+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package org.opensearch.dataprepper.plugins.source.source_crawler.base;
2+
3+
import io.micrometer.core.instrument.Counter;
4+
import io.micrometer.core.instrument.Timer;
5+
import org.opensearch.dataprepper.metrics.PluginMetrics;
6+
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
7+
import org.opensearch.dataprepper.model.buffer.Buffer;
8+
import org.opensearch.dataprepper.model.event.Event;
9+
import org.opensearch.dataprepper.model.record.Record;
10+
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
11+
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.LeaderPartition;
12+
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.SaasSourcePartition;
13+
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.TokenPaginationCrawlerLeaderProgressState;
14+
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.PaginationCrawlerWorkerProgressState;
15+
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
16+
import org.slf4j.Logger;
17+
import org.slf4j.LoggerFactory;
18+
19+
import javax.inject.Named;
20+
import java.time.Duration;
21+
import java.time.Instant;
22+
import java.util.ArrayList;
23+
import java.util.Iterator;
24+
import java.util.List;
25+
import java.util.concurrent.TimeUnit;
26+
import java.util.stream.Collectors;
27+
28+
import static org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler.LeaderScheduler.DEFAULT_EXTEND_LEASE_MINUTES;
29+
30+
@Named
31+
public class TokenPaginationCrawler implements Crawler<PaginationCrawlerWorkerProgressState> {
32+
private static final Logger log = LoggerFactory.getLogger(TokenPaginationCrawler.class);
33+
private static final int batchSize = 50;
34+
private static final String PAGINATION_WORKER_PARTITIONS_CREATED = "paginationWorkerPartitionsCreated";
35+
private static final String INVALID_PAGINATION_ITEMS = "invalidPaginationItems";
36+
private final Timer crawlingTimer;
37+
private final CrawlerClient client;
38+
private final Counter parititionsCreatedCounter;
39+
private final Counter invalidPaginationItemsCounter;
40+
41+
public TokenPaginationCrawler(CrawlerClient client,
42+
PluginMetrics pluginMetrics) {
43+
this.client = client;
44+
this.crawlingTimer = pluginMetrics.timer("crawlingTime");
45+
this.parititionsCreatedCounter = pluginMetrics.counter(PAGINATION_WORKER_PARTITIONS_CREATED);
46+
this.invalidPaginationItemsCounter = pluginMetrics.counter(INVALID_PAGINATION_ITEMS);
47+
48+
}
49+
50+
@Override
51+
public Instant crawl(LeaderPartition leaderPartition,
52+
EnhancedSourceCoordinator coordinator) {
53+
long startTime = System.currentTimeMillis();
54+
Instant lastLeaderSavedInstant = Instant.now();
55+
TokenPaginationCrawlerLeaderProgressState leaderProgressState = (TokenPaginationCrawlerLeaderProgressState) leaderPartition
56+
.getProgressState().get();
57+
String lastToken = leaderProgressState.getLastToken();
58+
Iterator<ItemInfo> itemInfoIterator = ((TokenCrawlerClient) client).listItems(lastToken);
59+
String latestToken = lastToken;
60+
log.info("Starting to crawl the source with last item ID: {}", lastToken);
61+
do {
62+
final List<ItemInfo> itemInfoList = new ArrayList<>();
63+
for (int i = 0; i < batchSize && itemInfoIterator.hasNext(); i++) {
64+
final ItemInfo nextItem = itemInfoIterator.next();
65+
if (nextItem == null) {
66+
//we don't expect null items, but just in case, we'll skip them
67+
log.warn("Unexpected encounter of a null item while processing batch with last item ID " + lastToken);
68+
invalidPaginationItemsCounter.increment();
69+
continue;
70+
}
71+
itemInfoList.add(nextItem);
72+
if (nextItem.getItemId() != null) {
73+
latestToken = nextItem.getItemId();
74+
}
75+
}
76+
if (!itemInfoList.isEmpty()) {
77+
createPartition(itemInfoList, coordinator);
78+
}
79+
// Check point leader progress state at every minute interval.
80+
Instant currentTimeInstance = Instant.now();
81+
if (Duration.between(lastLeaderSavedInstant, currentTimeInstance).toMinutes() >= 1) {
82+
// intermediate updates to master partition state
83+
updateLeaderProgressState(leaderPartition, latestToken, coordinator);
84+
lastLeaderSavedInstant = currentTimeInstance;
85+
}
86+
} while (itemInfoIterator.hasNext());
87+
updateLeaderProgressState(leaderPartition, latestToken, coordinator);
88+
long crawlTimeMillis = System.currentTimeMillis() - startTime;
89+
log.debug("Crawling completed in {} ms", crawlTimeMillis);
90+
crawlingTimer.record(crawlTimeMillis, TimeUnit.MILLISECONDS);
91+
return Instant.now(); // Return current time as required by Crawler interface
92+
}
93+
94+
public void executePartition(PaginationCrawlerWorkerProgressState state, Buffer<Record<Event>> buffer, AcknowledgementSet acknowledgementSet) {
95+
client.executePartition(state, buffer, acknowledgementSet);
96+
}
97+
98+
private void updateLeaderProgressState(LeaderPartition leaderPartition, String updatedToken, EnhancedSourceCoordinator coordinator) {
99+
TokenPaginationCrawlerLeaderProgressState leaderProgressState = (TokenPaginationCrawlerLeaderProgressState) leaderPartition
100+
.getProgressState().get();
101+
String oldToken = leaderProgressState.getLastToken();
102+
leaderProgressState.setLastToken(updatedToken);
103+
leaderPartition.setLeaderProgressState(leaderProgressState);
104+
coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_MINUTES);
105+
log.info("Updated leader progress state: old lastToken={}, new lastToken={}", oldToken, updatedToken);
106+
}
107+
108+
private void createPartition(List<ItemInfo> itemInfoList, EnhancedSourceCoordinator coordinator) {
109+
if (itemInfoList.isEmpty()) {
110+
return;
111+
}
112+
ItemInfo itemInfo = itemInfoList.get(0);
113+
String partitionKey = itemInfo.getPartitionKey();
114+
List<String> itemIds = itemInfoList.stream().map(ItemInfo::getId).collect(Collectors.toList());
115+
PaginationCrawlerWorkerProgressState state = new PaginationCrawlerWorkerProgressState();
116+
state.setKeyAttributes(itemInfo.getKeyAttributes());
117+
state.setItemIds(itemIds);
118+
state.setExportStartTime(Instant.now());
119+
state.setLoadedItems(itemInfoList.size());
120+
SaasSourcePartition sourcePartition = new SaasSourcePartition(state, partitionKey);
121+
coordinator.createPartition(sourcePartition);
122+
parititionsCreatedCounter.increment();
123+
}
124+
}

data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/partition/LeaderPartition.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition;
22

3+
import org.opensearch.dataprepper.plugins.source.source_crawler.base.TokenLeaderProgressState;
34
import com.fasterxml.jackson.core.JsonFactory;
45
import com.fasterxml.jackson.core.JsonProcessingException;
56
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -56,7 +57,22 @@ public Optional<LeaderProgressState> getProgressState() {
5657
}
5758

5859
public void setLeaderProgressState(LeaderProgressState state) {
59-
this.state.setLastPollTime(state.getLastPollTime());
60+
boolean stateIsToken = state instanceof TokenLeaderProgressState;
61+
boolean thisStateIsToken = this.state instanceof TokenLeaderProgressState;
62+
63+
if (stateIsToken != thisStateIsToken) {
64+
// Validate that the states are not inconsistent
65+
// We don't expect to reach here.
66+
throw new RuntimeException("Leader partition progress state type mismatch: " +
67+
"Provided state type: " + state.getClass().getSimpleName() +
68+
", Current state type: " + this.state.getClass().getSimpleName());
69+
}
70+
71+
if (state instanceof TokenLeaderProgressState) {
72+
((TokenLeaderProgressState) this.state).setLastToken(((TokenLeaderProgressState) state).getLastToken());
73+
} else {
74+
this.state.setLastPollTime(state.getLastPollTime());
75+
}
6076
}
6177

6278
/**
@@ -77,4 +93,4 @@ public LeaderProgressState convertToPartitionState(final String serializedPartit
7793
return null;
7894
}
7995
}
80-
}
96+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state;
2+
3+
import org.opensearch.dataprepper.plugins.source.source_crawler.base.TokenLeaderProgressState;
4+
import com.fasterxml.jackson.annotation.JsonProperty;
5+
import lombok.Data;
6+
import lombok.NoArgsConstructor;
7+
import org.opensearch.dataprepper.plugins.source.source_crawler.base.LeaderProgressState;
8+
9+
import java.time.Instant;
10+
11+
@Data
12+
@NoArgsConstructor
13+
public class TokenPaginationCrawlerLeaderProgressState implements TokenLeaderProgressState, LeaderProgressState {
14+
15+
@JsonProperty("initialized")
16+
private boolean initialized = false;
17+
18+
@JsonProperty("last_token")
19+
private String lastToken;
20+
21+
private Instant lastPollTime;
22+
23+
public TokenPaginationCrawlerLeaderProgressState(@JsonProperty("last_token") final String lastToken) {
24+
this.lastToken = lastToken;
25+
}
26+
}
27+

0 commit comments

Comments
 (0)