Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class LeaderOnlyTokenCrawler implements Crawler<SaasWorkerProgressState>
public static final String ACKNOWLEDGEMENT_SET_SUCCESS_METRIC_NAME = "acknowledgementSetSuccesses";
public static final String ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME = "acknowledgementSetFailures";

private final LeaderOnlyTokenCrawlerClient client;
private final CrawlerClient client;
private final Timer crawlingTimer;
private final PluginMetrics pluginMetrics;
@Setter
Expand All @@ -58,7 +58,7 @@ public class LeaderOnlyTokenCrawler implements Crawler<SaasWorkerProgressState>
private Duration noAckTimeout;

public LeaderOnlyTokenCrawler(
LeaderOnlyTokenCrawlerClient client,
CrawlerClient client,
PluginMetrics pluginMetrics) {
this.client = client;
this.pluginMetrics = pluginMetrics;
Expand All @@ -82,7 +82,7 @@ public Instant crawl(LeaderPartition leaderPartition,

log.info("Starting leader-only crawl with token: {}", lastToken);

Iterator<ItemInfo> itemIterator = client.listItems(lastToken);
Iterator<ItemInfo> itemIterator = ((LeaderOnlyTokenCrawlerClient) client).listItems(lastToken);

while (itemIterator.hasNext() && !shouldStopCrawl) {
List<ItemInfo> batch = collectBatch(itemIterator);
Expand Down Expand Up @@ -161,7 +161,7 @@ private void processBatch(List<ItemInfo> batch,

bufferWriteTimer.record(() -> {
try {
client.writeBatchToBuffer(batch, buffer, acknowledgementSet);
((LeaderOnlyTokenCrawlerClient) client).writeBatchToBuffer(batch, buffer, acknowledgementSet);
acknowledgementSet.complete();
// Check every 15 seconds until either:
// 1. We get an ack (positive/negative)
Expand Down Expand Up @@ -191,7 +191,7 @@ private void processBatch(List<ItemInfo> batch,
// Write directly and update checkpoint
bufferWriteTimer.record(() -> {
try {
client.writeBatchToBuffer(batch, buffer, null);
((LeaderOnlyTokenCrawlerClient) client).writeBatchToBuffer(batch, buffer, null);
updateLeaderProgressState(leaderPartition, lastToken, coordinator);
} catch (Exception e) {
log.error("Failed to write batch to buffer", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* This interface adds additional method for direct buffer writing,
* optimized for single-leader processing without worker partitions.
*/
public interface LeaderOnlyTokenCrawlerClient extends TokenCrawlerClient<PaginationCrawlerWorkerProgressState> {
public interface LeaderOnlyTokenCrawlerClient<T extends SaasWorkerProgressState> extends TokenCrawlerClient<PaginationCrawlerWorkerProgressState> {
/**
* Writes a batch of items directly to the buffer.
*
Expand Down
Loading