Skip to content

Commit f9b45fc

Browse files
author
Alekhya Parisha
committed
Implement LeaderOnlyTokenCrawler
Signed-off-by: Alekhya Parisha <aparisha@amazon.com>
1 parent 8dbdfeb commit f9b45fc

3 files changed

Lines changed: 358 additions & 0 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
package org.opensearch.dataprepper.plugins.source.source_crawler.base;
2+
3+
import io.micrometer.core.instrument.Counter;
4+
import io.micrometer.core.instrument.Timer;
5+
import lombok.Setter;
6+
import org.opensearch.dataprepper.metrics.PluginMetrics;
7+
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
8+
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
9+
import org.opensearch.dataprepper.model.buffer.Buffer;
10+
import org.opensearch.dataprepper.model.event.Event;
11+
import org.opensearch.dataprepper.model.record.Record;
12+
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
13+
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.LeaderPartition;
14+
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.TokenPaginationCrawlerLeaderProgressState;
15+
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
16+
import org.slf4j.Logger;
17+
import org.slf4j.LoggerFactory;
18+
19+
import javax.inject.Named;
20+
import java.time.Duration;
21+
import java.time.Instant;
22+
import java.util.ArrayList;
23+
import java.util.Iterator;
24+
import java.util.List;
25+
26+
@Named
27+
public class LeaderOnlyTokenCrawler implements Crawler {
28+
private static final Logger log = LoggerFactory.getLogger(LeaderOnlyTokenCrawler.class);
29+
private static final Duration BUFFER_WRITE_TIMEOUT = Duration.ofSeconds(15);
30+
private static final Duration CHECKPOINT_INTERVAL = Duration.ofMinutes(1);
31+
private static final Duration DEFAULT_LEASE_DURATION = Duration.ofMinutes(15);
32+
private static final int BATCH_SIZE = 50;
33+
34+
private static final String METRIC_BATCHES_FAILED = "batchesFailed";
35+
private static final String METRIC_BUFFER_WRITE_TIME = "bufferWriteTime";
36+
37+
private final LeaderOnlyTokenCrawlerClient client;
38+
private final PluginMetrics pluginMetrics;
39+
@Setter
40+
private boolean acknowledgementsEnabled;
41+
@Setter
42+
private AcknowledgementSetManager acknowledgementSetManager;
43+
private final Counter batchesFailedCounter;
44+
private final Timer bufferWriteTimer;
45+
46+
private String lastToken;
47+
48+
public LeaderOnlyTokenCrawler(
49+
LeaderOnlyTokenCrawlerClient client,
50+
PluginMetrics pluginMetrics) {
51+
this.client = client;
52+
this.pluginMetrics = pluginMetrics;
53+
54+
this.batchesFailedCounter = pluginMetrics.counter(METRIC_BATCHES_FAILED);
55+
this.bufferWriteTimer = pluginMetrics.timer(METRIC_BUFFER_WRITE_TIME);
56+
}
57+
58+
@Override
59+
public Instant crawl(LeaderPartition leaderPartition,
60+
EnhancedSourceCoordinator coordinator) {
61+
long startTime = System.currentTimeMillis();
62+
Instant lastCheckpointTime = Instant.now();
63+
64+
try {
65+
TokenPaginationCrawlerLeaderProgressState leaderProgressState =
66+
(TokenPaginationCrawlerLeaderProgressState) leaderPartition.getProgressState().get();
67+
lastToken = leaderProgressState.getLastToken();
68+
69+
log.info("Starting leader-only crawl with token: {}", lastToken);
70+
71+
Iterator<ItemInfo> itemIterator = client.listItems(lastToken);
72+
73+
while (itemIterator.hasNext()) {
74+
List<ItemInfo> batch = collectBatch(itemIterator);
75+
if (batch.isEmpty()) {
76+
continue;
77+
}
78+
79+
ItemInfo lastItem = batch.get(batch.size() - 1);
80+
lastToken = lastItem.getItemId();
81+
82+
try {
83+
processBatch(batch, leaderPartition, coordinator);
84+
} catch (Exception e) {
85+
batchesFailedCounter.increment();
86+
log.error("Failed to process batch ending with token {}", lastToken, e);
87+
throw e;
88+
}
89+
90+
// Periodic checkpoint if not using acknowledgments
91+
if (!acknowledgementsEnabled &&
92+
Duration.between(lastCheckpointTime, Instant.now()).compareTo(CHECKPOINT_INTERVAL) >= 0) {
93+
updateLeaderProgressState(leaderPartition, lastToken, coordinator);
94+
lastCheckpointTime = Instant.now();
95+
}
96+
}
97+
98+
// Final flush of any remaining items
99+
if (!acknowledgementsEnabled) {
100+
updateLeaderProgressState(leaderPartition, lastToken, coordinator);
101+
}
102+
103+
} catch (Exception e) {
104+
log.error("Error during crawl operation", e);
105+
throw new RuntimeException("Crawl operation failed", e);
106+
}
107+
108+
log.info("Crawl completed in {} ms", System.currentTimeMillis() - startTime);
109+
return Instant.now();
110+
}
111+
112+
@Override
113+
public void executePartition(SaasWorkerProgressState state, Buffer buffer, AcknowledgementSet acknowledgementSet) {
114+
115+
}
116+
117+
private List<ItemInfo> collectBatch(Iterator<ItemInfo> iterator) {
118+
List<ItemInfo> batch = new ArrayList<>();
119+
for (int i = 0; i < BATCH_SIZE && iterator.hasNext(); i++) {
120+
ItemInfo item = iterator.next();
121+
if (item != null) {
122+
batch.add(item);
123+
}
124+
}
125+
return batch;
126+
}
127+
128+
public void setBuffer(Buffer<Record<Event>> buffer) {
129+
client.setBuffer(buffer);
130+
}
131+
132+
private void processBatch(List<ItemInfo> batch,
133+
LeaderPartition leaderPartition,
134+
EnhancedSourceCoordinator coordinator) {
135+
if (acknowledgementsEnabled && acknowledgementSetManager != null) {
136+
AcknowledgementSet acknowledgementSet = acknowledgementSetManager.create(
137+
success -> {
138+
if (success) {
139+
// On success: update checkpoint
140+
updateLeaderProgressState(leaderPartition, lastToken, coordinator);
141+
} else {
142+
// On failure: give up partition
143+
log.error("Batch processing failed for token: {}", lastToken);
144+
coordinator.giveUpPartition(leaderPartition);
145+
}
146+
},
147+
BUFFER_WRITE_TIMEOUT
148+
);
149+
150+
bufferWriteTimer.record(() -> {
151+
try {
152+
client.writeBatchToBuffer(batch, acknowledgementSet);
153+
acknowledgementSet.complete();
154+
} catch (Exception e) {
155+
log.error("Failed to write batch to buffer", e);
156+
acknowledgementSet.complete(); // This will trigger the failure callback
157+
throw e;
158+
}
159+
});
160+
} else {
161+
// Without Acknowledgments:
162+
// Write directly and update checkpoint
163+
bufferWriteTimer.record(() -> {
164+
try {
165+
client.writeBatchToBuffer(batch, null);
166+
updateLeaderProgressState(leaderPartition, lastToken, coordinator);
167+
} catch (Exception e) {
168+
log.error("Failed to write batch to buffer", e);
169+
throw e;
170+
}
171+
});
172+
}
173+
}
174+
175+
private void updateLeaderProgressState(LeaderPartition leaderPartition,
176+
String updatedToken,
177+
EnhancedSourceCoordinator coordinator) {
178+
TokenPaginationCrawlerLeaderProgressState leaderProgressState =
179+
(TokenPaginationCrawlerLeaderProgressState) leaderPartition.getProgressState().get();
180+
String oldToken = leaderProgressState.getLastToken();
181+
leaderProgressState.setLastToken(updatedToken);
182+
leaderPartition.setLeaderProgressState(leaderProgressState);
183+
coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_LEASE_DURATION);
184+
log.info("Updated leader progress state: old lastToken={}, new lastToken={}", oldToken, updatedToken);
185+
}
186+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package org.opensearch.dataprepper.plugins.source.source_crawler.base;
2+
3+
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
4+
import org.opensearch.dataprepper.model.buffer.Buffer;
5+
import org.opensearch.dataprepper.model.event.Event;
6+
import org.opensearch.dataprepper.model.record.Record;
7+
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.PaginationCrawlerWorkerProgressState;
8+
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
9+
10+
import java.util.List;
11+
12+
/**
13+
* Interface for leader-only token-based crawler client that extends TokenCrawlerClient.
14+
* This interface adds additional methods for direct buffer writing and buffer management,
15+
* optimized for single-leader processing without worker partitions.
16+
*/
17+
public interface LeaderOnlyTokenCrawlerClient extends TokenCrawlerClient<PaginationCrawlerWorkerProgressState> {
18+
/**
19+
* Writes a batch of items directly to the buffer.
20+
*
21+
* @param items The batch of items to write
22+
* @param acknowledgementSet Optional acknowledgment set for tracking write completion.
23+
* If provided, items will be added to this set for acknowledgment tracking.
24+
* Can be null if acknowledgments are disabled.
25+
*/
26+
void writeBatchToBuffer(List<ItemInfo> items, AcknowledgementSet acknowledgementSet);
27+
28+
/**
29+
* Sets the buffer for writing events. Must be called before any buffer operations.
30+
* This method initializes the buffer that will be used by writeBatchToBuffer.
31+
*
32+
* @param buffer The buffer to write events to
33+
*/
34+
void setBuffer(Buffer<Record<Event>> buffer);
35+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
package org.opensearch.dataprepper.plugins.source.source_crawler.base;
2+
3+
import org.junit.jupiter.api.BeforeEach;
4+
import org.junit.jupiter.api.Test;
5+
import org.junit.jupiter.api.extension.ExtendWith;
6+
import org.mockito.Mock;
7+
import org.mockito.junit.jupiter.MockitoExtension;
8+
import org.opensearch.dataprepper.metrics.PluginMetrics;
9+
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
10+
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
11+
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
12+
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.LeaderPartition;
13+
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.TokenPaginationCrawlerLeaderProgressState;
14+
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
15+
import org.opensearch.dataprepper.plugins.source.source_crawler.model.TestItemInfo;
16+
17+
import java.time.Duration;
18+
import java.time.Instant;
19+
import java.util.ArrayList;
20+
import java.util.Collections;
21+
import java.util.HashMap;
22+
import java.util.List;
23+
import java.util.Optional;
24+
25+
import static org.junit.jupiter.api.Assertions.assertEquals;
26+
import static org.junit.jupiter.api.Assertions.assertThrows;
27+
import static org.mockito.ArgumentMatchers.any;
28+
import static org.mockito.ArgumentMatchers.eq;
29+
import static org.mockito.Mockito.verify;
30+
import static org.mockito.Mockito.when;
31+
import static org.mockito.Mockito.doThrow;
32+
import static org.mockito.Mockito.never;
33+
34+
import static org.mockito.internal.verification.VerificationModeFactory.times;
35+
36+
@ExtendWith(MockitoExtension.class)
37+
class LeaderOnlyTokenCrawlerTest {
38+
private static final int BATCH_SIZE = 50;
39+
private static final String INITIAL_TOKEN = "initial-token";
40+
private static final Duration BUFFER_WRITE_TIMEOUT = Duration.ofSeconds(15);
41+
42+
@Mock
43+
private LeaderOnlyTokenCrawlerClient client;
44+
@Mock
45+
private EnhancedSourceCoordinator coordinator;
46+
@Mock
47+
private LeaderPartition leaderPartition;
48+
@Mock
49+
private AcknowledgementSetManager acknowledgementSetManager;
50+
@Mock
51+
private AcknowledgementSet acknowledgementSet;
52+
53+
private LeaderOnlyTokenCrawler crawler;
54+
private final PluginMetrics pluginMetrics = PluginMetrics.fromNames("CrawlerTest", "crawler");
55+
56+
@BeforeEach
57+
void setup() {
58+
crawler = new LeaderOnlyTokenCrawler(client, pluginMetrics);
59+
crawler.setAcknowledgementSetManager(acknowledgementSetManager);
60+
when(leaderPartition.getProgressState())
61+
.thenReturn(Optional.of(new TokenPaginationCrawlerLeaderProgressState(INITIAL_TOKEN)));
62+
}
63+
64+
@Test
65+
void testCrawlWithEmptyList() {
66+
when(client.listItems(INITIAL_TOKEN)).thenReturn(Collections.emptyIterator());
67+
crawler.crawl(leaderPartition, coordinator);
68+
verify(client, never()).writeBatchToBuffer(any(), any());
69+
}
70+
71+
@Test
72+
void testSimpleCrawl() {
73+
List<ItemInfo> items = createTestItems(1);
74+
when(client.listItems(INITIAL_TOKEN)).thenReturn(items.iterator());
75+
crawler.crawl(leaderPartition, coordinator);
76+
verify(client).writeBatchToBuffer(items, null);
77+
}
78+
79+
@Test
80+
void testCrawlWithAcknowledgmentsEnabled() {
81+
List<ItemInfo> items = createTestItems(1);
82+
when(client.listItems(INITIAL_TOKEN)).thenReturn(items.iterator());
83+
when(acknowledgementSetManager.create(any(), any(Duration.class)))
84+
.thenReturn(acknowledgementSet);
85+
86+
crawler.setAcknowledgementsEnabled(true);
87+
crawler.crawl(leaderPartition, coordinator);
88+
89+
verify(client).writeBatchToBuffer(items, acknowledgementSet);
90+
verify(acknowledgementSet).complete();
91+
}
92+
93+
@Test
94+
void testCrawlWithMultipleBatches() {
95+
List<ItemInfo> items = createTestItems(BATCH_SIZE + 1); // Create more than one batch
96+
when(client.listItems(INITIAL_TOKEN)).thenReturn(items.iterator());
97+
98+
crawler.setAcknowledgementsEnabled(false);
99+
crawler.crawl(leaderPartition, coordinator);
100+
101+
verify(client, times(2)).writeBatchToBuffer(any(), any());
102+
}
103+
104+
@Test
105+
void testBufferWriteFailure() {
106+
List<ItemInfo> items = createTestItems(1);
107+
when(client.listItems(INITIAL_TOKEN)).thenReturn(items.iterator());
108+
doThrow(new RuntimeException("Buffer write failed"))
109+
.when(client).writeBatchToBuffer(any(), any());
110+
111+
assertThrows(RuntimeException.class, () ->
112+
crawler.crawl(leaderPartition, coordinator));
113+
}
114+
115+
@Test
116+
void testProgressStateUpdate() {
117+
List<ItemInfo> items = createTestItems(1);
118+
String newToken = "id0"; // This will be the ID of the created test item
119+
when(client.listItems(INITIAL_TOKEN)).thenReturn(items.iterator());
120+
121+
crawler.setAcknowledgementsEnabled(false);
122+
crawler.crawl(leaderPartition, coordinator);
123+
124+
verify(coordinator, times(2)).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class));
125+
TokenPaginationCrawlerLeaderProgressState state =
126+
(TokenPaginationCrawlerLeaderProgressState) leaderPartition.getProgressState().get();
127+
assertEquals(newToken, state.getLastToken());
128+
}
129+
130+
private List<ItemInfo> createTestItems(int count) {
131+
List<ItemInfo> items = new ArrayList<>();
132+
for (int i = 0; i < count; i++) {
133+
items.add(new TestItemInfo("id" + i, new HashMap<>(), Instant.now()));
134+
}
135+
return items;
136+
}
137+
}

0 commit comments

Comments
 (0)