Skip to content

Commit 179e3df

Browse files
authored
Adds integration tests for pipeline connectors. (#4834)
This commit adds integration testing for the pipeline connector sink/source which connects two pipelines. There are two tests. The first tests against a single connection with a single sink. The second test also includes a second sink to verify that pipeline connections work with additional sinks. This commit also includes fixes for CoreHttpServerIT. When running the new pipeline connector tests, the CoreHttpServerIT tests started failing. I found some places where shutdowns were not occurring and fixed those. And I added some additional logging to help debug. The root problem turned out to be that the ExecutorService used in the DataPrepperServer was a static field. The CoreHttpServerIT was working because it was the first test that JUnit chose. With the new tests, it is being chosen later and by that point, the static ExecutorService was shutdown. The fix is simply to avoid using a static ExecutorService. Signed-off-by: David Venable <dlv@amazon.com>
1 parent 0e32ec6 commit 179e3df

11 files changed

Lines changed: 319 additions & 18 deletions

File tree

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.integration;
7+
8+
import org.junit.jupiter.api.AfterEach;
9+
import org.junit.jupiter.api.BeforeEach;
10+
import org.junit.jupiter.api.Test;
11+
import org.opensearch.dataprepper.model.event.Event;
12+
import org.opensearch.dataprepper.model.event.JacksonEvent;
13+
import org.opensearch.dataprepper.model.record.Record;
14+
import org.opensearch.dataprepper.plugins.InMemorySinkAccessor;
15+
import org.opensearch.dataprepper.plugins.InMemorySourceAccessor;
16+
import org.opensearch.dataprepper.test.framework.DataPrepperTestRunner;
17+
18+
import java.util.List;
19+
import java.util.UUID;
20+
import java.util.concurrent.TimeUnit;
21+
import java.util.stream.Collectors;
22+
import java.util.stream.IntStream;
23+
24+
import static org.awaitility.Awaitility.await;
25+
import static org.hamcrest.CoreMatchers.equalTo;
26+
import static org.hamcrest.CoreMatchers.not;
27+
import static org.hamcrest.MatcherAssert.assertThat;
28+
import static org.hamcrest.Matchers.empty;
29+
30+
public class Connected_SingleExtraSinkIT {
31+
private static final String IN_MEMORY_IDENTIFIER = "Connected_SingleExtraSinkIT";
32+
private static final String IN_MEMORY_IDENTIFIER_ENTRY_SINK = IN_MEMORY_IDENTIFIER + "_Entry";
33+
private static final String IN_MEMORY_IDENTIFIER_EXIT_SINK = IN_MEMORY_IDENTIFIER + "_Exit";
34+
private static final String PIPELINE_CONFIGURATION_UNDER_TEST = "connected/single-connection-extra-sink.yaml";
35+
private DataPrepperTestRunner dataPrepperTestRunner;
36+
private InMemorySourceAccessor inMemorySourceAccessor;
37+
private InMemorySinkAccessor inMemorySinkAccessor;
38+
39+
@BeforeEach
40+
void setUp() {
41+
dataPrepperTestRunner = DataPrepperTestRunner.builder()
42+
.withPipelinesDirectoryOrFile(PIPELINE_CONFIGURATION_UNDER_TEST)
43+
.build();
44+
45+
dataPrepperTestRunner.start();
46+
inMemorySourceAccessor = dataPrepperTestRunner.getInMemorySourceAccessor();
47+
inMemorySinkAccessor = dataPrepperTestRunner.getInMemorySinkAccessor();
48+
}
49+
50+
@AfterEach
51+
void tearDown() {
52+
dataPrepperTestRunner.stop();
53+
}
54+
55+
@Test
56+
void pipeline_with_single_batch_of_records() {
57+
final int recordsToCreate = 200;
58+
final List<Record<Event>> inputRecords = IntStream.range(0, recordsToCreate)
59+
.mapToObj(i -> UUID.randomUUID().toString())
60+
.map(JacksonEvent::fromMessage)
61+
.map(Record::new)
62+
.collect(Collectors.toList());
63+
64+
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, inputRecords);
65+
66+
await().atMost(800, TimeUnit.MILLISECONDS)
67+
.untilAsserted(() -> {
68+
assertThat(inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER_ENTRY_SINK), not(empty()));
69+
assertThat(inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER_EXIT_SINK), not(empty()));
70+
});
71+
72+
assertThat(inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER_ENTRY_SINK).size(), equalTo(recordsToCreate));
73+
assertThat(inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER_EXIT_SINK).size(), equalTo(recordsToCreate));
74+
}
75+
76+
@Test
77+
void pipeline_with_multiple_batches_of_records() {
78+
final int recordsToCreateBatch1 = 200;
79+
final List<Record<Event>> inputRecordsBatch1 = IntStream.range(0, recordsToCreateBatch1)
80+
.mapToObj(i -> UUID.randomUUID().toString())
81+
.map(JacksonEvent::fromMessage)
82+
.map(Record::new)
83+
.collect(Collectors.toList());
84+
85+
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, inputRecordsBatch1);
86+
87+
await().atMost(800, TimeUnit.MILLISECONDS)
88+
.untilAsserted(() -> {
89+
assertThat(inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER_ENTRY_SINK), not(empty()));
90+
assertThat(inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER_EXIT_SINK), not(empty()));
91+
});
92+
93+
assertThat(inMemorySinkAccessor.getAndClear(IN_MEMORY_IDENTIFIER_ENTRY_SINK).size(), equalTo(recordsToCreateBatch1));
94+
assertThat(inMemorySinkAccessor.getAndClear(IN_MEMORY_IDENTIFIER_EXIT_SINK).size(), equalTo(recordsToCreateBatch1));
95+
96+
final int recordsToCreateBatch2 = 300;
97+
final List<Record<Event>> inputRecordsBatch2 = IntStream.range(0, recordsToCreateBatch2)
98+
.mapToObj(i -> UUID.randomUUID().toString())
99+
.map(JacksonEvent::fromMessage)
100+
.map(Record::new)
101+
.collect(Collectors.toList());
102+
103+
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, inputRecordsBatch2);
104+
105+
await().atMost(400, TimeUnit.MILLISECONDS)
106+
.untilAsserted(() -> {
107+
assertThat(inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER_ENTRY_SINK), not(empty()));
108+
assertThat(inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER_EXIT_SINK), not(empty()));
109+
});
110+
111+
assertThat(inMemorySinkAccessor.getAndClear(IN_MEMORY_IDENTIFIER_ENTRY_SINK).size(), equalTo(recordsToCreateBatch2));
112+
assertThat(inMemorySinkAccessor.getAndClear(IN_MEMORY_IDENTIFIER_EXIT_SINK).size(), equalTo(recordsToCreateBatch2));
113+
}
114+
115+
}
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.integration;
7+
8+
import org.junit.jupiter.api.AfterEach;
9+
import org.junit.jupiter.api.BeforeEach;
10+
import org.junit.jupiter.api.Test;
11+
import org.opensearch.dataprepper.model.event.Event;
12+
import org.opensearch.dataprepper.model.event.JacksonEvent;
13+
import org.opensearch.dataprepper.model.record.Record;
14+
import org.opensearch.dataprepper.plugins.InMemorySinkAccessor;
15+
import org.opensearch.dataprepper.plugins.InMemorySourceAccessor;
16+
import org.opensearch.dataprepper.test.framework.DataPrepperTestRunner;
17+
18+
import java.util.Collections;
19+
import java.util.List;
20+
import java.util.UUID;
21+
import java.util.concurrent.TimeUnit;
22+
import java.util.stream.Collectors;
23+
import java.util.stream.IntStream;
24+
25+
import static org.awaitility.Awaitility.await;
26+
import static org.hamcrest.CoreMatchers.equalTo;
27+
import static org.hamcrest.CoreMatchers.is;
28+
import static org.hamcrest.CoreMatchers.not;
29+
import static org.hamcrest.MatcherAssert.assertThat;
30+
import static org.hamcrest.Matchers.empty;
31+
32+
public class Connected_SingleIT {
33+
private static final String IN_MEMORY_IDENTIFIER = "Connected_SingleIT";
34+
private static final String PIPELINE_CONFIGURATION_UNDER_TEST = "connected/single-connection.yaml";
35+
private DataPrepperTestRunner dataPrepperTestRunner;
36+
private InMemorySourceAccessor inMemorySourceAccessor;
37+
private InMemorySinkAccessor inMemorySinkAccessor;
38+
39+
@BeforeEach
40+
void setUp() {
41+
dataPrepperTestRunner = DataPrepperTestRunner.builder()
42+
.withPipelinesDirectoryOrFile(PIPELINE_CONFIGURATION_UNDER_TEST)
43+
.build();
44+
45+
dataPrepperTestRunner.start();
46+
inMemorySourceAccessor = dataPrepperTestRunner.getInMemorySourceAccessor();
47+
inMemorySinkAccessor = dataPrepperTestRunner.getInMemorySinkAccessor();
48+
}
49+
50+
@AfterEach
51+
void tearDown() {
52+
dataPrepperTestRunner.stop();
53+
}
54+
55+
@Test
56+
void pipeline_with_no_data() throws InterruptedException {
57+
final List<Record<Event>> preRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
58+
assertThat(preRecords, is(empty()));
59+
60+
Thread.sleep(1400);
61+
62+
final List<Record<Event>> postRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
63+
assertThat(postRecords, is(empty()));
64+
}
65+
66+
@Test
67+
void pipeline_with_single_record() {
68+
final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString());
69+
final Record<Event> eventRecord = new Record<>(event);
70+
71+
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, Collections.singletonList(eventRecord));
72+
73+
await().atMost(800, TimeUnit.MILLISECONDS)
74+
.untilAsserted(() -> {
75+
assertThat(inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER), not(empty()));
76+
});
77+
78+
assertThat(inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER).size(), equalTo(1));
79+
}
80+
81+
@Test
82+
void pipeline_with_single_batch_of_records() {
83+
final int recordsToCreate = 200;
84+
final List<Record<Event>> inputRecords = IntStream.range(0, recordsToCreate)
85+
.mapToObj(i -> UUID.randomUUID().toString())
86+
.map(JacksonEvent::fromMessage)
87+
.map(Record::new)
88+
.collect(Collectors.toList());
89+
90+
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, inputRecords);
91+
92+
await().atMost(800, TimeUnit.MILLISECONDS)
93+
.untilAsserted(() -> {
94+
assertThat(inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER).size(), equalTo(recordsToCreate));
95+
});
96+
}
97+
98+
@Test
99+
void pipeline_with_multiple_batches_of_records() {
100+
final int recordsToCreateBatch1 = 200;
101+
final List<Record<Event>> inputRecordsBatch1 = IntStream.range(0, recordsToCreateBatch1)
102+
.mapToObj(i -> UUID.randomUUID().toString())
103+
.map(JacksonEvent::fromMessage)
104+
.map(Record::new)
105+
.collect(Collectors.toList());
106+
107+
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, inputRecordsBatch1);
108+
109+
await().atMost(800, TimeUnit.MILLISECONDS)
110+
.untilAsserted(() -> {
111+
assertThat(inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER).size(), equalTo(recordsToCreateBatch1));
112+
});
113+
114+
assertThat(inMemorySinkAccessor.getAndClear(IN_MEMORY_IDENTIFIER).size(), equalTo(recordsToCreateBatch1));
115+
116+
final int recordsToCreateBatch2 = 300;
117+
final List<Record<Event>> inputRecordsBatch2 = IntStream.range(0, recordsToCreateBatch2)
118+
.mapToObj(i -> UUID.randomUUID().toString())
119+
.map(JacksonEvent::fromMessage)
120+
.map(Record::new)
121+
.collect(Collectors.toList());
122+
123+
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, inputRecordsBatch2);
124+
125+
await().atMost(400, TimeUnit.MILLISECONDS)
126+
.untilAsserted(() -> {
127+
assertThat(inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER).size(), equalTo(recordsToCreateBatch2));
128+
});
129+
130+
assertThat(inMemorySinkAccessor.getAndClear(IN_MEMORY_IDENTIFIER).size(), equalTo(recordsToCreateBatch2));
131+
}
132+
133+
}

data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/CoreHttpServerIT.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,25 +6,25 @@
66
package org.opensearch.dataprepper.integration;
77

88
import com.linecorp.armeria.client.WebClient;
9+
import com.linecorp.armeria.common.AggregatedHttpResponse;
910
import com.linecorp.armeria.common.HttpMethod;
1011
import com.linecorp.armeria.common.HttpStatus;
1112
import com.linecorp.armeria.common.RequestHeaders;
1213
import com.linecorp.armeria.common.SessionProtocol;
1314
import org.junit.jupiter.api.AfterEach;
1415
import org.junit.jupiter.api.BeforeEach;
1516
import org.junit.jupiter.api.Test;
16-
import org.opensearch.dataprepper.plugins.InMemorySinkAccessor;
17-
import org.opensearch.dataprepper.plugins.InMemorySourceAccessor;
1817
import org.opensearch.dataprepper.test.framework.DataPrepperTestRunner;
18+
import org.slf4j.Logger;
19+
import org.slf4j.LoggerFactory;
1920

2021
import static org.hamcrest.CoreMatchers.equalTo;
2122
import static org.hamcrest.MatcherAssert.assertThat;
2223

2324
class CoreHttpServerIT {
25+
private static final Logger log = LoggerFactory.getLogger(CoreHttpServerIT.class);
2426
private static final String PIPELINE_CONFIGURATION_UNDER_TEST = "minimal-pipeline.yaml";
2527
private DataPrepperTestRunner dataPrepperTestRunner;
26-
private InMemorySourceAccessor inMemorySourceAccessor;
27-
private InMemorySinkAccessor inMemorySinkAccessor;
2828

2929
@BeforeEach
3030
void setUp() {
@@ -33,8 +33,6 @@ void setUp() {
3333
.build();
3434

3535
dataPrepperTestRunner.start();
36-
inMemorySourceAccessor = dataPrepperTestRunner.getInMemorySourceAccessor();
37-
inMemorySinkAccessor = dataPrepperTestRunner.getInMemorySinkAccessor();
3836
}
3937

4038
@AfterEach
@@ -44,17 +42,16 @@ void tearDown() {
4442

4543
@Test
4644
void verify_list_api_is_running() {
47-
WebClient.of().execute(RequestHeaders.builder()
45+
log.info("Making API request for test.");
46+
final AggregatedHttpResponse response = WebClient.of().execute(RequestHeaders.builder()
4847
.scheme(SessionProtocol.HTTP)
4948
.authority("127.0.0.1:4900")
5049
.method(HttpMethod.GET)
5150
.path("/list")
5251
.build())
5352
.aggregate()
54-
.whenComplete((response, ex) -> {
55-
assertThat("Http Status", response.status(), equalTo(HttpStatus.OK));
56-
})
5753
.join();
58-
}
5954

55+
assertThat(response.status(), equalTo(HttpStatus.OK));
56+
}
6057
}

data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySource.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,11 @@ public boolean areAcknowledgementsEnabled() {
6868
@Override
6969
public void stop() {
7070
isStopped = true;
71-
runningThread.interrupt();
71+
try {
72+
runningThread.join(1000);
73+
} catch (final InterruptedException e) {
74+
runningThread.interrupt();
75+
}
7276
}
7377

7478
private class SourceRunner implements Runnable {

data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/test/framework/DataPrepperTestRunner.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public void start() {
6666
public void stop() {
6767
final DataPrepper dataPrepper = contextManager.getDataPrepperBean();
6868
dataPrepper.shutdown();
69+
contextManager.shutdown();
6970
}
7071

7172
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
entry-pipeline:
2+
delay: 5
3+
source:
4+
in_memory:
5+
testing_key: Connected_SingleExtraSinkIT
6+
sink:
7+
- pipeline:
8+
name: exit-pipeline
9+
- in_memory:
10+
testing_key: Connected_SingleExtraSinkIT_Entry
11+
12+
exit-pipeline:
13+
delay: 5
14+
source:
15+
pipeline:
16+
name: entry-pipeline
17+
sink:
18+
- in_memory:
19+
testing_key: Connected_SingleExtraSinkIT_Exit
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
entry-pipeline:
2+
delay: 5
3+
source:
4+
in_memory:
5+
testing_key: Connected_SingleIT
6+
sink:
7+
- pipeline:
8+
name: exit-pipeline
9+
10+
exit-pipeline:
11+
delay: 5
12+
source:
13+
pipeline:
14+
name: entry-pipeline
15+
sink:
16+
- in_memory:
17+
testing_key: Connected_SingleIT

data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
@SuppressWarnings({"rawtypes", "unchecked"})
5555
public class Pipeline {
5656
private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class);
57+
private static final int SINK_LOGGING_FREQUENCY = (int) Duration.ofSeconds(60).toMillis();
5758
private volatile AtomicBoolean stopRequested;
5859

5960
private final String name;
@@ -249,12 +250,13 @@ public void execute() {
249250

250251
sinkExecutorService.submit(() -> {
251252
long retryCount = 0;
253+
final long sleepIfNotReadyTime = 200;
252254
while (!isReady() && !isStopRequested()) {
253-
if (retryCount++ % 60 == 0) {
255+
if (retryCount++ % (SINK_LOGGING_FREQUENCY / sleepIfNotReadyTime) == 0) {
254256
LOG.info("Pipeline [{}] Waiting for Sink to be ready", name);
255257
}
256258
try {
257-
Thread.sleep(1000);
259+
Thread.sleep(sleepIfNotReadyTime);
258260
} catch (Exception e){}
259261
}
260262
startSourceAndProcessors();

0 commit comments

Comments
 (0)