Skip to content

Commit f12da2d

Browse files
author
Alekhya Parisha
committed
Add worker retry mechanism for failed batch processing in LeaderOnlyTokenCrawler
Signed-off-by: Alekhya Parisha <aparisha@amazon.com>
1 parent 2c3b8e0 commit f12da2d

2 files changed

Lines changed: 102 additions & 28 deletions

File tree

data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/LeaderOnlyTokenCrawler.java

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
import org.opensearch.dataprepper.model.record.Record;
1313
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
1414
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.LeaderPartition;
15+
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.SaasSourcePartition;
16+
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.PaginationCrawlerWorkerProgressState;
1517
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.TokenPaginationCrawlerLeaderProgressState;
1618
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
1719
import org.slf4j.Logger;
@@ -25,9 +27,10 @@
2527
import java.util.List;
2628
import java.util.concurrent.TimeUnit;
2729
import java.util.concurrent.atomic.AtomicBoolean;
30+
import java.util.stream.Collectors;
2831

2932
@Named
30-
public class LeaderOnlyTokenCrawler implements Crawler<SaasWorkerProgressState> {
33+
public class LeaderOnlyTokenCrawler implements Crawler<PaginationCrawlerWorkerProgressState> {
3134
private static final Logger log = LoggerFactory.getLogger(LeaderOnlyTokenCrawler.class);
3235
private static final Duration NO_ACK_TIME_OUT_SECONDS = Duration.ofSeconds(900);
3336
private static final Duration CHECKPOINT_INTERVAL = Duration.ofMinutes(1);
@@ -54,7 +57,6 @@ public class LeaderOnlyTokenCrawler implements Crawler<SaasWorkerProgressState>
5457
private final Timer bufferWriteTimer;
5558

5659
private String lastToken;
57-
private boolean shouldStopCrawl = false;
5860
private Duration noAckTimeout;
5961

6062
public LeaderOnlyTokenCrawler(
@@ -73,7 +75,6 @@ public LeaderOnlyTokenCrawler(
7375
@Override
7476
public Instant crawl(LeaderPartition leaderPartition,
7577
EnhancedSourceCoordinator coordinator) {
76-
shouldStopCrawl = false;
7778
long startTime = System.currentTimeMillis();
7879
Instant lastCheckpointTime = Instant.now();
7980
TokenPaginationCrawlerLeaderProgressState leaderProgressState =
@@ -84,7 +85,7 @@ public Instant crawl(LeaderPartition leaderPartition,
8485

8586
Iterator<ItemInfo> itemIterator = ((LeaderOnlyTokenCrawlerClient) client).listItems(lastToken);
8687

87-
while (itemIterator.hasNext() && !shouldStopCrawl) {
88+
while (itemIterator.hasNext()) {
8889
List<ItemInfo> batch = collectBatch(itemIterator);
8990
if (batch.isEmpty()) {
9091
continue;
@@ -121,8 +122,8 @@ public Instant crawl(LeaderPartition leaderPartition,
121122
}
122123

123124
@Override
124-
public void executePartition(SaasWorkerProgressState state, Buffer buffer, AcknowledgementSet acknowledgementSet) {
125-
125+
public void executePartition(PaginationCrawlerWorkerProgressState state, Buffer buffer, AcknowledgementSet acknowledgementSet) {
126+
client.executePartition(state, buffer, acknowledgementSet);
126127
}
127128

128129
private List<ItemInfo> collectBatch(Iterator<ItemInfo> iterator) {
@@ -148,12 +149,12 @@ private void processBatch(List<ItemInfo> batch,
148149
if (success) {
149150
// On success: update checkpoint
150151
acknowledgementSetSuccesses.increment();
151-
updateLeaderProgressState(leaderPartition, lastToken, coordinator);
152152
} else {
153-
// On failure: Stop the crawl
153+
// On failure: Create a retry partition
154154
acknowledgementSetFailures.increment();
155-
log.warn("Batch processing received negative acknowledgment for token: {}. Stopping current crawl.", lastToken);
156-
shouldStopCrawl = true;
155+
log.warn("Batch processing received negative acknowledgment for token: {}. Creating retry " +
156+
"partition.", lastToken);
157+
createRetryPartition(batch, coordinator);
157158
}
158159
},
159160
noAckTimeout
@@ -172,17 +173,19 @@ private void processBatch(List<ItemInfo> batch,
172173

173174
if (!ackWaitDuration.minus(noAckTimeout).isNegative()) {
174175
// No ack received within NO_ACK_TIME_OUT_SECONDS
175-
log.warn("Acknowledgment not received for batch with token {} past wait time. Stopping current crawl.", lastToken);
176-
shouldStopCrawl = true;
176+
log.warn("No acknowledgment received for batch with token: {}. Creating retry partition.", lastToken);
177+
createRetryPartition(batch, coordinator);
177178
break;
178179
}
179180
}
181+
updateLeaderProgressState(leaderPartition, lastToken, coordinator);
180182
} catch (InterruptedException e) {
181183
Thread.currentThread().interrupt();
182184
throw new RuntimeException("Interrupted while waiting for acknowledgment", e);
183185
} catch (Exception e) {
184186
log.error("Failed to process batch ending with token {}", lastToken, e);
185187
acknowledgementSet.complete();
188+
createRetryPartition(batch, coordinator);
186189
throw e;
187190
}
188191
});
@@ -201,6 +204,23 @@ private void processBatch(List<ItemInfo> batch,
201204
}
202205
}
203206

207+
private void createRetryPartition(List<ItemInfo> itemInfoList, EnhancedSourceCoordinator coordinator) {
208+
if (itemInfoList.isEmpty()) {
209+
return;
210+
}
211+
ItemInfo itemInfo = itemInfoList.get(0);
212+
String partitionKey = itemInfo.getPartitionKey();
213+
List<String> itemIds = itemInfoList.stream().map(ItemInfo::getId).collect(Collectors.toList());
214+
PaginationCrawlerWorkerProgressState state = new PaginationCrawlerWorkerProgressState();
215+
state.setKeyAttributes(itemInfo.getKeyAttributes());
216+
state.setItemIds(itemIds);
217+
state.setExportStartTime(Instant.now());
218+
state.setLoadedItems(itemInfoList.size());
219+
SaasSourcePartition sourcePartition = new SaasSourcePartition(state, partitionKey);
220+
coordinator.createPartition(sourcePartition);
221+
}
222+
223+
204224
private void updateLeaderProgressState(LeaderPartition leaderPartition,
205225
String updatedToken,
206226
EnhancedSourceCoordinator coordinator) {

data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/LeaderOnlyTokenCrawlerTest.java

Lines changed: 70 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package org.opensearch.dataprepper.plugins.source.source_crawler.base;
22

3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
36
import org.junit.jupiter.api.BeforeEach;
47
import org.junit.jupiter.api.Test;
58
import org.junit.jupiter.api.extension.ExtendWith;
@@ -14,6 +17,8 @@
1417
import org.opensearch.dataprepper.model.record.Record;
1518
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
1619
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.LeaderPartition;
20+
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.SaasSourcePartition;
21+
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.PaginationCrawlerWorkerProgressState;
1722
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.TokenPaginationCrawlerLeaderProgressState;
1823
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
1924
import org.opensearch.dataprepper.plugins.source.source_crawler.model.TestItemInfo;
@@ -29,12 +34,14 @@
2934

3035
import static org.junit.jupiter.api.Assertions.assertEquals;
3136
import static org.junit.jupiter.api.Assertions.assertThrows;
37+
import static org.junit.jupiter.api.Assertions.fail;
3238
import static org.mockito.ArgumentMatchers.any;
3339
import static org.mockito.ArgumentMatchers.eq;
3440
import static org.mockito.Mockito.verify;
3541
import static org.mockito.Mockito.when;
3642
import static org.mockito.Mockito.doThrow;
3743
import static org.mockito.Mockito.never;
44+
import static org.mockito.Mockito.doAnswer;
3845
import static org.mockito.internal.verification.VerificationModeFactory.times;
3946

4047
@ExtendWith(MockitoExtension.class)
@@ -144,29 +151,79 @@ void testProgressStateUpdate() {
144151
}
145152

146153
@Test
147-
void testNegativeAcknowledgment() {
148-
List<ItemInfo> items = createTestItems(BATCH_SIZE + 1);
149-
when(client.listItems(INITIAL_TOKEN)).thenReturn(items.iterator());
150-
when(acknowledgementSetManager.create(any(), eq(TEST_TIMEOUT)))
151-
.thenReturn(acknowledgementSet);
154+
void testRetryPartitionStateSerializationAndDeserialization() {
155+
List<ItemInfo> batch = createTestItems( 1);
152156

153-
ArgumentCaptor<Consumer<Boolean>> callbackCaptor = ArgumentCaptor.forClass(Consumer.class);
157+
// Mock client to return our test items
158+
when(client.listItems(INITIAL_TOKEN)).thenReturn(batch.iterator());
154159

160+
// Capture the created partition
161+
ArgumentCaptor<SaasSourcePartition> partitionCaptor = ArgumentCaptor.forClass(SaasSourcePartition.class);
162+
163+
// Simulate negative acknowledgment
164+
when(acknowledgementSetManager.create(any(), any())).thenAnswer(inv -> {
165+
Consumer<Boolean> callback = inv.getArgument(0);
166+
callback.accept(false);
167+
return acknowledgementSet;
168+
});
169+
170+
// Create retry partition
155171
crawler.setAcknowledgementsEnabled(true);
156172
crawler.crawl(leaderPartition, coordinator);
157173

158-
verify(acknowledgementSetManager).create(callbackCaptor.capture(), eq(TEST_TIMEOUT));
174+
// Verify partition creation and get original state
175+
verify(coordinator).createPartition(partitionCaptor.capture());
176+
SaasSourcePartition originalPartition = partitionCaptor.getValue();
177+
PaginationCrawlerWorkerProgressState originalState =
178+
(PaginationCrawlerWorkerProgressState) originalPartition.getProgressState().get();
179+
180+
// Test serialization/deserialization
181+
ObjectMapper objectMapper = new ObjectMapper()
182+
.registerModule(new JavaTimeModule()); // For Instant serialization
183+
184+
try {
185+
// Serialize state
186+
String serializedState = objectMapper.writeValueAsString(originalState);
187+
188+
// Deserialize state
189+
PaginationCrawlerWorkerProgressState deserializedState =
190+
objectMapper.readValue(serializedState, PaginationCrawlerWorkerProgressState.class);
191+
192+
// Verify deserialized state matches original
193+
assertEquals(originalState.getItemIds(), deserializedState.getItemIds());
194+
assertEquals(originalState.getKeyAttributes(), deserializedState.getKeyAttributes());
195+
assertEquals(originalState.getExportStartTime(), deserializedState.getExportStartTime());
196+
assertEquals(originalState.getLoadedItems(), deserializedState.getLoadedItems());
197+
} catch (JsonProcessingException e) {
198+
fail("Serialization/deserialization failed", e);
199+
}
200+
}
159201

160-
// Simulate negative acknowledgment
161-
callbackCaptor.getValue().accept(false);
202+
@Test
203+
void testNegativeAcknowledgment() {
204+
List<ItemInfo> items = createTestItems(1);
205+
when(client.listItems(INITIAL_TOKEN)).thenReturn(items.iterator());
206+
207+
// Setup immediate negative acknowledgment
208+
doAnswer(invocation -> {
209+
Consumer<Boolean> callback = invocation.getArgument(0);
210+
callback.accept(false); // Trigger negative ack immediately
211+
return acknowledgementSet;
212+
}).when(acknowledgementSetManager).create(any(), eq(TEST_TIMEOUT));
162213

214+
crawler.setAcknowledgementsEnabled(true);
215+
crawler.crawl(leaderPartition, coordinator);
216+
217+
// Verify behavior
163218
verify(client, times(1)).writeBatchToBuffer(any(), any(), any());
164-
verify(coordinator, never()).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class));
219+
verify(coordinator, times(1)).createPartition(any());
220+
verify(acknowledgementSet, times(1)).complete();
165221
}
166222

223+
167224
@Test
168225
void testAcknowledgmentTimeout() {
169-
List<ItemInfo> items = createTestItems(BATCH_SIZE + 1);
226+
List<ItemInfo> items = createTestItems( 1);
170227
when(client.listItems(INITIAL_TOKEN)).thenReturn(items.iterator());
171228
when(acknowledgementSetManager.create(any(), eq(TEST_TIMEOUT)))
172229
.thenReturn(acknowledgementSet);
@@ -178,13 +235,10 @@ void testAcknowledgmentTimeout() {
178235

179236
verify(acknowledgementSetManager).create(callbackCaptor.capture(), eq(TEST_TIMEOUT));
180237

181-
// Verify:
182-
// 1. Only first batch was processed
238+
// Verify timeout behavior
183239
verify(client, times(1)).writeBatchToBuffer(any(), any(), any());
184-
// 2. No checkpoint update happened
185-
verify(coordinator, never()).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class));
186-
// 3. Acknowledgment set was completed
187240
verify(acknowledgementSet).complete();
241+
verify(coordinator).createPartition(any());
188242
}
189243

190244

0 commit comments

Comments
 (0)