Skip to content

Commit 0266303

Browse files
bbenner7635Brendan Benner
andauthored
Skip second completePartition when WorkerScheduler is processing partition (#6066)
Signed-off-by: Brendan Benner <bbenner@amazon.com> Co-authored-by: Brendan Benner <bbenner@amazon.com>
1 parent b85edbe commit 0266303

2 files changed

Lines changed: 55 additions & 3 deletions

File tree

  • data-prepper-plugins/saas-source-plugins/source-crawler/src
    • main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler
    • test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler

data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/WorkerScheduler.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerSourceConfig;
1414
import org.opensearch.dataprepper.plugins.source.source_crawler.base.SaasWorkerProgressState;
1515
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.SaasSourcePartition;
16+
import com.google.common.annotations.VisibleForTesting;
1617
import org.slf4j.Logger;
1718
import org.slf4j.LoggerFactory;
1819

@@ -111,13 +112,21 @@ private void processPartition(EnhancedSourcePartition partition, Buffer<Record<E
111112
AcknowledgementSet acknowledgementSet = null;
112113
if (sourceConfig.isAcknowledgments()) {
113114
acknowledgementSet = createAcknowledgementSet(partition);
115+
// When acknowledgments are enabled, partition completion is handled in the acknowledgment callback
116+
crawler.executePartition((SaasWorkerProgressState) partition.getProgressState().get(), buffer, acknowledgementSet);
117+
} else {
118+
// When acknowledgments are disabled, complete the partition immediately after execution
119+
crawler.executePartition((SaasWorkerProgressState) partition.getProgressState().get(), buffer, acknowledgementSet);
120+
sourceCoordinator.completePartition(partition);
114121
}
115-
crawler.executePartition((SaasWorkerProgressState) partition.getProgressState().get(), buffer, acknowledgementSet);
122+
} else {
123+
// If no progress state, complete the partition immediately
124+
sourceCoordinator.completePartition(partition);
116125
}
117-
sourceCoordinator.completePartition(partition);
118126
}
119127

120-
private AcknowledgementSet createAcknowledgementSet(EnhancedSourcePartition partition) {
128+
@VisibleForTesting
129+
AcknowledgementSet createAcknowledgementSet(EnhancedSourcePartition partition) {
121130
return acknowledgementSetManager.create((result) -> {
122131
if (result) {
123132
acknowledgementSetSuccesses.increment();

data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/WorkerSchedulerTest.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,11 @@
3030
import static org.junit.jupiter.api.Assertions.assertEquals;
3131
import static org.junit.jupiter.api.Assertions.assertNotNull;
3232
import static org.junit.jupiter.api.Assertions.assertTrue;
33+
import static org.mockito.ArgumentMatchers.any;
3334
import static org.mockito.ArgumentMatchers.eq;
3435
import static org.mockito.BDDMockito.given;
3536
import static org.mockito.Mockito.atLeast;
37+
import static org.mockito.Mockito.times;
3638
import static org.mockito.Mockito.verify;
3739
import static org.mockito.Mockito.verifyNoInteractions;
3840
import static org.mockito.Mockito.when;
@@ -216,4 +218,45 @@ void testRetryBackOffTriggeredWhenExceptionOccurred() throws InterruptedExceptio
216218
// Crawler shouldn't be invoked in this case
217219
verifyNoInteractions(crawler);
218220
}
221+
222+
@Test
223+
void testCompletePartitionWithAcknowledgements() throws InterruptedException {
224+
WorkerScheduler workerScheduler = new WorkerScheduler(pluginName, buffer,
225+
coordinator, sourceConfig, crawler, pluginMetrics, acknowledgementSetManager);
226+
WorkerScheduler spyWorkerScheduler = org.mockito.Mockito.spy(workerScheduler);
227+
228+
when(sourceConfig.isAcknowledgments()).thenReturn(true);
229+
230+
PaginationCrawlerWorkerProgressState mockProgressState = new PaginationCrawlerWorkerProgressState();
231+
232+
EnhancedSourcePartition mockPartition = org.mockito.Mockito.mock(EnhancedSourcePartition.class);
233+
when(mockPartition.getProgressState()).thenReturn(Optional.of(mockProgressState));
234+
235+
org.mockito.Mockito.doAnswer(invocation -> {
236+
coordinator.completePartition(mockPartition);
237+
return acknowledgementSet;
238+
}).when(spyWorkerScheduler).createAcknowledgementSet(any(EnhancedSourcePartition.class));
239+
240+
given(coordinator.acquireAvailablePartition(SaasSourcePartition.PARTITION_TYPE))
241+
.willReturn(Optional.of(mockPartition))
242+
.willReturn(Optional.empty());
243+
244+
ExecutorService executorService = Executors.newSingleThreadExecutor();
245+
executorService.submit(spyWorkerScheduler);
246+
247+
Thread.sleep(500);
248+
executorService.shutdownNow();
249+
250+
verify(mockPartition, atLeast(1)).getProgressState();
251+
verify(sourceConfig, atLeast(1)).isAcknowledgments();
252+
253+
// Verify that createAcknowledgementSet was called
254+
verify(spyWorkerScheduler, times(1)).createAcknowledgementSet(eq(mockPartition));
255+
256+
// Verify that crawler.executePartition was called with acknowledgement set
257+
verify(crawler, times(1)).executePartition(any(SaasWorkerProgressState.class), eq(buffer), eq(acknowledgementSet));
258+
259+
// Verify that coordinator.completePartition was called (from the acknowledgement callback with true)
260+
verify(coordinator, times(1)).completePartition(eq(mockPartition));
261+
}
219262
}

0 commit comments

Comments
 (0)