|
40 | 40 |
|
41 | 41 | import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; |
42 | 42 | import org.opensearch.dataprepper.model.event.Event; |
| 43 | +import org.opensearch.dataprepper.model.event.EventHandle; |
43 | 44 | import org.opensearch.dataprepper.model.log.JacksonLog; |
44 | 45 | import org.opensearch.dataprepper.model.record.Record; |
45 | 46 | import software.amazon.awssdk.regions.Region; |
|
50 | 51 | import static org.awaitility.Awaitility.await; |
51 | 52 | import static org.mockito.ArgumentMatchers.anyString; |
52 | 53 | import static org.mockito.ArgumentMatchers.any; |
| 54 | +import static org.mockito.Mockito.verify; |
| 55 | +import static org.mockito.Mockito.times; |
53 | 56 | import static org.mockito.Mockito.mock; |
54 | 57 | import static org.mockito.Mockito.when; |
55 | 58 | import static org.mockito.Mockito.lenient; |
@@ -88,6 +91,9 @@ public class CloudWatchLogsIT { |
88 | 91 | @Mock |
89 | 92 | private ThresholdConfig thresholdConfig; |
90 | 93 |
|
| 94 | + @Mock |
| 95 | + private EventHandle eventHandle; |
| 96 | + |
91 | 97 | @Mock |
92 | 98 | private CloudWatchLogsSinkConfig cloudWatchLogsSinkConfig; |
93 | 99 |
|
@@ -127,6 +133,7 @@ void setUp() { |
127 | 133 | requestsFailedCount = new AtomicInteger(0); |
128 | 134 | dlqSuccessCount = new AtomicInteger(0); |
129 | 135 | objectMapper = new ObjectMapper(); |
| 136 | + eventHandle = mock(EventHandle.class); |
130 | 137 | pluginSetting = mock(PluginSetting.class); |
131 | 138 | pluginFactory = mock(PluginFactory.class); |
132 | 139 | when(pluginSetting.getPipelineName()).thenReturn("pipeline"); |
@@ -208,6 +215,7 @@ void setUp() { |
208 | 215 | when(thresholdConfig.getFlushInterval()).thenReturn(60L); |
209 | 216 | when(thresholdConfig.getMaxEventSizeBytes()).thenReturn(1000L); |
210 | 217 | when(cloudWatchLogsSinkConfig.getThresholdConfig()).thenReturn(thresholdConfig); |
| 218 | + when(cloudWatchLogsSinkConfig.getWorkers()).thenReturn(3); |
211 | 219 | } |
212 | 220 |
|
213 | 221 | private List<String> listObjectsWithPrefix(String bucketName, String prefix) { |
@@ -297,7 +305,7 @@ void TestSinkOperationWithLogSendInterval() throws Exception { |
297 | 305 | assertThat(eventsSuccessCount.get(), equalTo(NUM_RECORDS)); |
298 | 306 | assertThat(requestsSuccessCount.get(), equalTo(1)); |
299 | 307 | assertThat(dlqSuccessCount.get(), equalTo(0)); |
300 | | - |
| 308 | + verify(eventHandle, times(NUM_RECORDS)).release(true); |
301 | 309 | } |
302 | 310 |
|
303 | 311 | @Test |
@@ -333,6 +341,7 @@ void TestSinkOperationWithBatchSize() throws Exception { |
333 | 341 | assertThat(eventsSuccessCount.get(), equalTo(NUM_RECORDS)); |
334 | 342 | assertThat(requestsSuccessCount.get(), equalTo(NUM_RECORDS)); |
335 | 343 | assertThat(dlqSuccessCount.get(), equalTo(0)); |
| 344 | + verify(eventHandle, times(NUM_RECORDS)).release(true); |
336 | 345 |
|
337 | 346 | } |
338 | 347 |
|
@@ -368,6 +377,7 @@ void TestSinkOperationWithMaxRequestSize() throws Exception { |
368 | 377 | assertThat(eventsSuccessCount.get(), equalTo(NUM_RECORDS)); |
369 | 378 | assertThat(requestsSuccessCount.get(), equalTo(1)); |
370 | 379 | assertThat(dlqSuccessCount.get(), equalTo(0)); |
| 380 | + verify(eventHandle, times(NUM_RECORDS)).release(true); |
371 | 381 |
|
372 | 382 | } |
373 | 383 |
|
@@ -423,6 +433,7 @@ void testWithLargeSingleMessagesSentToDLQ() { |
423 | 433 | assertThat(eventsSuccessCount.get(), equalTo(NUM_RECORDS)); |
424 | 434 | assertThat(requestsSuccessCount.get(), equalTo(1)); |
425 | 435 | assertThat(dlqSuccessCount.get(), equalTo(1)); |
| 436 | + verify(eventHandle, times(NUM_RECORDS+1)).release(true); |
426 | 437 |
|
427 | 438 | } |
428 | 439 |
|
@@ -461,21 +472,28 @@ void testWithBadCredentials_AllEventsToDLQ() { |
461 | 472 | assertThat(eventsSuccessCount.get(), equalTo(0)); |
462 | 473 | assertThat(requestsSuccessCount.get(), equalTo(0)); |
463 | 474 | assertThat(dlqSuccessCount.get(), equalTo(NUM_RECORDS)); |
| 475 | + verify(eventHandle, times(NUM_RECORDS)).release(true); |
464 | 476 |
|
465 | 477 | } |
466 | 478 |
|
467 | 479 | private Collection<Record<Event>> getRecordList(int numberOfRecords) { |
468 | 480 | final Collection<Record<Event>> recordList = new ArrayList<>(); |
469 | 481 | List<HashMap> records = generateRecords(numberOfRecords); |
470 | 482 | for (int i = 0; i < numberOfRecords; i++) { |
471 | | - final Event event = JacksonLog.builder().withData(records.get(i)).build(); |
| 483 | + final Event event = JacksonLog.builder() |
| 484 | + .withData(records.get(i)) |
| 485 | + .withEventHandle(eventHandle) |
| 486 | + .build(); |
472 | 487 | recordList.add(new Record<>(event)); |
473 | 488 | } |
474 | 489 | return recordList; |
475 | 490 | } |
476 | 491 |
|
477 | 492 | private Record<Event> getLargeRecord(int size) { |
478 | | - final Event event = JacksonLog.builder().withData(Map.of("key", RandomStringUtils.randomAlphabetic(size))).build(); |
| 493 | + final Event event = JacksonLog.builder() |
| 494 | + .withData(Map.of("key", RandomStringUtils.randomAlphabetic(size))) |
| 495 | + .withEventHandle(eventHandle) |
| 496 | + .build(); |
479 | 497 | return new Record<>(event); |
480 | 498 | } |
481 | 499 |
|
|
0 commit comments