diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java index ccb70d536c..9748304e39 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java @@ -33,6 +33,8 @@ @FixMethodOrder() class PipelinesWithAcksIT { private static final Logger LOG = LoggerFactory.getLogger(PipelinesWithAcksIT.class); + private static final int NUM_RECORDS = 100; + private static final int WAIT_TIME_MS = 60000; private static final String IN_MEMORY_IDENTIFIER = "PipelinesWithAcksIT"; private static final String SIMPLE_PIPELINE_CONFIGURATION_UNDER_TEST = "acknowledgements/simple-test.yaml"; private static final String TWO_PIPELINES_CONFIGURATION_UNDER_TEST = "acknowledgements/two-pipelines-test.yaml"; @@ -71,7 +73,7 @@ void simple_pipeline_with_single_record() { final int numRecords = 1; inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); - await().atMost(40000, TimeUnit.MILLISECONDS) + await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS) .untilAsserted(() -> { List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER); assertThat(outputRecords, not(empty())); @@ -84,14 +86,13 @@ void simple_pipeline_with_single_record() { @Test void simple_pipeline_with_multiple_records() { setUp(SIMPLE_PIPELINE_CONFIGURATION_UNDER_TEST); - final int numRecords = 100; - inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); + inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, NUM_RECORDS); - await().atMost(40000, TimeUnit.MILLISECONDS) + await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS) .untilAsserted(() -> { List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER); assertThat(outputRecords, not(empty())); - assertThat(outputRecords.size(), equalTo(numRecords)); + assertThat(outputRecords.size(), equalTo(NUM_RECORDS)); }); assertTrue(inMemorySourceAccessor.getAckReceived()); } @@ -99,14 +100,13 @@ void simple_pipeline_with_multiple_records() { @Test void two_pipelines_with_multiple_records() { setUp(TWO_PIPELINES_CONFIGURATION_UNDER_TEST); - final int numRecords = 100; - inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); + inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, NUM_RECORDS); - await().atMost(40000, TimeUnit.MILLISECONDS) + await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS) .untilAsserted(() -> { List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER); assertThat(outputRecords, not(empty())); - assertThat(outputRecords.size(), equalTo(numRecords)); + assertThat(outputRecords.size(), equalTo(NUM_RECORDS)); }); assertTrue(inMemorySourceAccessor.getAckReceived()); } @@ -114,14 +114,13 @@ void two_pipelines_with_multiple_records() { @Test void three_pipelines_with_multiple_records() { setUp(THREE_PIPELINES_CONFIGURATION_UNDER_TEST); - final int numRecords = 100; - inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); + inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, NUM_RECORDS); - await().atMost(40000, TimeUnit.MILLISECONDS) + await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS) .untilAsserted(() -> { List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER); assertThat(outputRecords, not(empty())); - assertThat(outputRecords.size(), equalTo(numRecords)); + assertThat(outputRecords.size(), equalTo(NUM_RECORDS)); }); assertTrue(inMemorySourceAccessor.getAckReceived()); } @@ -132,7 +131,7 @@ void three_pipelines_with_all_unrouted_records() { final int numRecords = 2; inMemorySourceAccessor.submitWithStatus(IN_MEMORY_IDENTIFIER, numRecords); - await().atMost(40000, TimeUnit.MILLISECONDS) + await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS) .untilAsserted(() -> { assertNotNull(inMemorySourceAccessor); assertNotNull(inMemorySourceAccessor.getAckReceived()); @@ -145,14 +144,13 @@ void three_pipelines_with_all_unrouted_records() { @Test void three_pipelines_with_route_and_multiple_records() { setUp(THREE_PIPELINES_WITH_ROUTE_CONFIGURATION_UNDER_TEST); - final int numRecords = 100; - inMemorySourceAccessor.submitWithStatus(IN_MEMORY_IDENTIFIER, numRecords); + inMemorySourceAccessor.submitWithStatus(IN_MEMORY_IDENTIFIER, NUM_RECORDS); - await().atMost(40000, TimeUnit.MILLISECONDS) + await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS) .untilAsserted(() -> { List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER); assertThat(outputRecords, not(empty())); - assertThat(outputRecords.size(), lessThanOrEqualTo(numRecords)); + assertThat(outputRecords.size(), lessThanOrEqualTo(NUM_RECORDS)); }); assertThat(inMemorySourceAccessor.getAckReceived(), equalTo(true)); } @@ -160,15 +158,14 @@ void three_pipelines_with_route_and_multiple_records() { @Test void three_pipelines_with_default_route_and_multiple_records() { setUp(THREE_PIPELINES_WITH_DEFAULT_ROUTE_CONFIGURATION_UNDER_TEST); - final int numRecords = 10; - inMemorySourceAccessor.submitWithStatus(IN_MEMORY_IDENTIFIER, numRecords); + inMemorySourceAccessor.submitWithStatus(IN_MEMORY_IDENTIFIER, NUM_RECORDS); - await().atMost(40000, TimeUnit.MILLISECONDS) + await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS) .untilAsserted(() -> { List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER); assertThat(outputRecords, not(empty())); - assertThat(outputRecords.size(), equalTo(2*numRecords)); + assertThat(outputRecords.size(), equalTo(2*NUM_RECORDS)); }); assertTrue(inMemorySourceAccessor.getAckReceived()); } @@ -176,14 +173,13 @@ void three_pipelines_with_default_route_and_multiple_records() { @Test void two_parallel_pipelines_multiple_records() { setUp(TWO_PARALLEL_PIPELINES_CONFIGURATION_UNDER_TEST); - final int numRecords = 100; - inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); + inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, NUM_RECORDS); - await().atMost(40000, TimeUnit.MILLISECONDS) + await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS) .untilAsserted(() -> { List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER); assertThat(outputRecords, not(empty())); - assertThat(outputRecords.size(), equalTo(2*numRecords)); + assertThat(outputRecords.size(), equalTo(2*NUM_RECORDS)); }); assertTrue(inMemorySourceAccessor.getAckReceived()); } @@ -191,14 +187,13 @@ void two_parallel_pipelines_multiple_records() { @Test void three_pipelines_multi_sink_multiple_records() { setUp(THREE_PIPELINES_MULTI_SINK_CONFIGURATION_UNDER_TEST); - final int numRecords = 100; - inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); + inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, NUM_RECORDS); - await().atMost(40000, TimeUnit.MILLISECONDS) + await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS) .untilAsserted(() -> { List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER); assertThat(outputRecords, not(empty())); - assertThat(outputRecords.size(), equalTo(3*numRecords)); + assertThat(outputRecords.size(), equalTo(3*NUM_RECORDS)); }); assertTrue(inMemorySourceAccessor.getAckReceived()); } @@ -206,14 +201,13 @@ void three_pipelines_multi_sink_multiple_records() { @Test void one_pipeline_three_sinks_multiple_records() { setUp(ONE_PIPELINE_THREE_SINKS_CONFIGURATION_UNDER_TEST); - final int numRecords = 100; - inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); + inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, NUM_RECORDS); - await().atMost(40000, TimeUnit.MILLISECONDS) + await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS) .untilAsserted(() -> { List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER); assertThat(outputRecords, not(empty())); - assertThat(outputRecords.size(), equalTo(3*numRecords)); + assertThat(outputRecords.size(), equalTo(3*NUM_RECORDS)); }); assertTrue(inMemorySourceAccessor.getAckReceived()); } @@ -221,14 +215,13 @@ void one_pipeline_three_sinks_multiple_records() { @Test void one_pipeline_ack_expiry_multiple_records() { setUp(ONE_PIPELINE_ACK_EXPIRY_CONFIGURATION_UNDER_TEST); - final int numRecords = 100; - inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); + inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, NUM_RECORDS); - await().atMost(40000, TimeUnit.MILLISECONDS) + await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS) .untilAsserted(() -> { List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER); assertThat(outputRecords, not(empty())); - assertThat(outputRecords.size(), equalTo(numRecords)); + assertThat(outputRecords.size(), equalTo(NUM_RECORDS)); }); assertThat(inMemorySourceAccessor.getAckReceived(), equalTo(null)); } @@ -236,15 +229,14 @@ void one_pipeline_ack_expiry_multiple_records() { @Test void one_pipeline_three_sinks_negative_ack_multiple_records() { setUp(ONE_PIPELINE_THREE_SINKS_CONFIGURATION_UNDER_TEST); - final int numRecords = 100; - inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); + inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, NUM_RECORDS); inMemorySinkAccessor.setResult(false); - await().atMost(40000, TimeUnit.MILLISECONDS) + await().atMost(WAIT_TIME_MS, TimeUnit.MILLISECONDS) .untilAsserted(() -> { List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER); assertThat(outputRecords, not(empty())); - assertThat(outputRecords.size(), equalTo(3*numRecords)); + assertThat(outputRecords.size(), equalTo(3*NUM_RECORDS)); }); assertFalse(inMemorySourceAccessor.getAckReceived()); } diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/Router_ThreeRoutesDefaultIT.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/Router_ThreeRoutesDefaultIT.java index db94eef044..3c4c09734d 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/Router_ThreeRoutesDefaultIT.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/Router_ThreeRoutesDefaultIT.java @@ -126,7 +126,7 @@ void test_default_route() { inMemorySourceAccessor.submit(TESTING_KEY, allEvents); - await().atMost(2, TimeUnit.SECONDS) + await().atMost(30, TimeUnit.SECONDS) .untilAsserted(() -> { assertThat(inMemorySinkAccessor.get(ALPHA_SOURCE_KEY), not(empty())); assertThat(inMemorySinkAccessor.get(BETA_SOURCE_KEY), not(empty())); @@ -134,34 +134,34 @@ void test_default_route() { assertThat(inMemorySinkAccessor.get(ALPHA_DEFAULT_SOURCE_KEY), not(empty())); assertThat(inMemorySinkAccessor.get(ALPHA_BETA_GAMMA_SOURCE_KEY), not(empty())); assertThat(inMemorySinkAccessor.get(DEFAULT_SOURCE_KEY), not(empty())); - }); - final List> actualAlphaRecords = inMemorySinkAccessor.get(ALPHA_SOURCE_KEY); + final List> actualAlphaRecords = inMemorySinkAccessor.get(ALPHA_SOURCE_KEY); - assertThat(actualAlphaRecords.size(), equalTo(alphaEvents.size())); + assertThat(actualAlphaRecords.size(), equalTo(alphaEvents.size())); - assertThat(actualAlphaRecords, containsInAnyOrder(allEvents.stream() - .filter(alphaEvents::contains).toArray())); + assertThat(actualAlphaRecords, containsInAnyOrder(allEvents.stream() + .filter(alphaEvents::contains).toArray())); - final List> actualBetaRecords = inMemorySinkAccessor.get(BETA_SOURCE_KEY); + final List> actualBetaRecords = inMemorySinkAccessor.get(BETA_SOURCE_KEY); - assertThat(actualBetaRecords.size(), equalTo(betaEvents.size())); + assertThat(actualBetaRecords.size(), equalTo(betaEvents.size())); - assertThat(actualBetaRecords, containsInAnyOrder(allEvents.stream() - .filter(betaEvents::contains).toArray())); + assertThat(actualBetaRecords, containsInAnyOrder(allEvents.stream() + .filter(betaEvents::contains).toArray())); - final List> actualDefaultRecords = inMemorySinkAccessor.get(DEFAULT_SOURCE_KEY); + final List> actualDefaultRecords = inMemorySinkAccessor.get(DEFAULT_SOURCE_KEY); - assertThat(actualDefaultRecords.size(), equalTo(defaultEvents.size())); - assertThat(actualDefaultRecords, containsInAnyOrder(allEvents.stream() - .filter(defaultEvents::contains).toArray())); + assertThat(actualDefaultRecords.size(), equalTo(defaultEvents.size())); + assertThat(actualDefaultRecords, containsInAnyOrder(allEvents.stream() + .filter(defaultEvents::contains).toArray())); - final List> actualAlphaDefaultRecords = new ArrayList<>(); - actualAlphaDefaultRecords.addAll(actualAlphaRecords); - actualAlphaDefaultRecords.addAll(actualDefaultRecords); - assertThat(actualAlphaDefaultRecords.size(), equalTo(defaultEvents.size()+alphaEvents.size())); - assertThat(actualAlphaDefaultRecords, containsInAnyOrder(allEvents.stream() - .filter(event -> defaultEvents.contains(event) || alphaEvents.contains(event)).toArray())); + final List> actualAlphaDefaultRecords = new ArrayList<>(); + actualAlphaDefaultRecords.addAll(actualAlphaRecords); + actualAlphaDefaultRecords.addAll(actualDefaultRecords); + assertThat(actualAlphaDefaultRecords.size(), equalTo(defaultEvents.size()+alphaEvents.size())); + assertThat(actualAlphaDefaultRecords, containsInAnyOrder(allEvents.stream() + .filter(event -> defaultEvents.contains(event) || alphaEvents.contains(event)).toArray())); + }); }