Skip to content

Commit f6c5b40

Browse files
committed
Add E2E test
Signed-off-by: Tomas Longo <tlongo@sternad.de>
1 parent 656475c commit f6c5b40

7 files changed

Lines changed: 187 additions & 17 deletions

File tree

data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSource.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -177,9 +177,11 @@ private Server createServer(ServerBuilder serverBuilder, Buffer<Record<Object>>
177177

178178
private void configureHttpService(ServerBuilder serverBuilder, Buffer<Record<Object>> buffer, BlockingQueue<Runnable> blockingQueue) {
179179
final String path = oTelLogsSourceConfig.getHttpPath().replace("${pipelineName}", pipelineName);
180+
LOG.info("Configuring HTTP service under {} ", path);
181+
180182

181183
final ArmeriaHttpService armeriaHttpService = new ArmeriaHttpService(buffer, pluginMetrics, 100);
182-
final RetryInfoConfig retryInfo = oTelLogsSourceConfig.getRetryInfo();
184+
final RetryInfoConfig retryInfo = oTelLogsSourceConfig.getRetryInfo() != null ? oTelLogsSourceConfig.getRetryInfo() : new RetryInfoConfig();
183185
final HttpExceptionHandler httpExceptionHandler = new HttpExceptionHandler(pluginMetrics, retryInfo.getMinDelay(), retryInfo.getMaxDelay());
184186

185187
final int maxPendingRequests = MAX_PENDING_REQUESTS;
@@ -199,6 +201,7 @@ private void configureHttpService(ServerBuilder serverBuilder, Buffer<Record<Obj
199201
}
200202

201203
private void configureGrpcService(ServerBuilder serverBuilder, Buffer<Record<Object>> buffer) {
204+
LOG.info("Configuring gRPC service");
202205

203206
final GrpcServiceBuilder grpcServiceBuilder = GrpcService
204207
.builder()
@@ -223,15 +226,20 @@ private void configureGrpcService(ServerBuilder serverBuilder, Buffer<Record<Obj
223226
grpcServiceBuilder.enableUnframedRequests(true);
224227
}
225228

226-
final String path = oTelLogsSourceConfig.getPath().replace("${pipelineName}", pipelineName);
227229
final CreateServer.GRPCServiceConfig<?, ?> grpcServiceConfig = new CreateServer.GRPCServiceConfig<>(oTelLogsGrpcService);
228-
grpcServiceBuilder.addService(
229-
path,
230-
ServerInterceptors.intercept(grpcServiceConfig.getService(), interceptors),
231-
LogsServiceGrpc.getExportMethod());
230+
if (oTelLogsSourceConfig.getPath() != null) {
231+
final String path = oTelLogsSourceConfig.getPath().replace("${pipelineName}", pipelineName);
232+
LOG.info("custom gRPC path: {} ", path);
233+
grpcServiceBuilder.addService(
234+
path,
235+
ServerInterceptors.intercept(grpcServiceConfig.getService(), interceptors),
236+
LogsServiceGrpc.getExportMethod());
237+
} else {
238+
grpcServiceBuilder.addService(ServerInterceptors.intercept(grpcServiceConfig.getService(), interceptors));
239+
}
232240

233241
if (oTelLogsSourceConfig.hasHealthCheck()) {
234-
LOG.info("Health check is enabled");
242+
LOG.info("Health check for gRPC service is enabled");
235243
grpcServiceBuilder.addService(new HealthGrpcService());
236244
}
237245

data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceGrpcTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,6 @@
9595
import static org.mockito.ArgumentMatchers.anyBoolean;
9696
import static org.mockito.ArgumentMatchers.anyCollection;
9797
import static org.mockito.ArgumentMatchers.anyInt;
98-
import static org.mockito.ArgumentMatchers.anyString;
9998
import static org.mockito.ArgumentMatchers.eq;
10099
import static org.mockito.ArgumentMatchers.isA;
101100
import static org.mockito.Mockito.doThrow;
@@ -259,7 +258,7 @@ void start_with_Health_configured_includes_HealthCheck_service() throws IOExcept
259258
MockedStatic<GrpcService> grpcServerMock = Mockito.mockStatic(GrpcService.class)) {
260259
armeriaServerMock.when(Server::builder).thenReturn(serverBuilder);
261260
grpcServerMock.when(GrpcService::builder).thenReturn(grpcServiceBuilder);
262-
when(grpcServiceBuilder.addService(anyString(), any(ServerServiceDefinition.class), any())).thenReturn(grpcServiceBuilder);
261+
when(grpcServiceBuilder.addService(any(ServerServiceDefinition.class))).thenReturn(grpcServiceBuilder);
263262
when(grpcServiceBuilder.useClientTimeoutHeader(anyBoolean())).thenReturn(grpcServiceBuilder);
264263
when(grpcServiceBuilder.exceptionHandler(any(GrpcRequestExceptionHandler.class))).thenReturn(grpcServiceBuilder);
265264

@@ -289,7 +288,7 @@ void start_without_Health_configured_does_not_include_HealthCheck_service() thro
289288
MockedStatic<GrpcService> grpcServerMock = Mockito.mockStatic(GrpcService.class)) {
290289
armeriaServerMock.when(Server::builder).thenReturn(serverBuilder);
291290
grpcServerMock.when(GrpcService::builder).thenReturn(grpcServiceBuilder);
292-
when(grpcServiceBuilder.addService(anyString(), any(ServerServiceDefinition.class), any())).thenReturn(grpcServiceBuilder);
291+
when(grpcServiceBuilder.addService(any(ServerServiceDefinition.class))).thenReturn(grpcServiceBuilder);
293292
when(grpcServiceBuilder.useClientTimeoutHeader(anyBoolean())).thenReturn(grpcServiceBuilder);
294293
when(grpcServiceBuilder.exceptionHandler(any(
295294
GrpcRequestExceptionHandler.class))).thenReturn(grpcServiceBuilder);

data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceHttpTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import static org.junit.jupiter.api.Assertions.assertEquals;
1818
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
1919
import static org.junit.jupiter.api.Assertions.assertThrows;
20-
import static org.junit.jupiter.api.Assertions.assertTrue;
2120
import static org.junit.jupiter.params.provider.Arguments.arguments;
2221
import static org.mockito.ArgumentMatchers.any;
2322
import static org.mockito.ArgumentMatchers.anyBoolean;

data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OtelLogsSourceConfigFixture.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,12 @@
44
import static org.opensearch.dataprepper.plugins.source.otellogs.OTelLogsSourceConfig.DEFAULT_REQUEST_TIMEOUT_MS;
55
import static org.opensearch.dataprepper.plugins.source.otellogs.OtelLogsSourceConfigTestData.BASIC_AUTH_PASSWORD;
66
import static org.opensearch.dataprepper.plugins.source.otellogs.OtelLogsSourceConfigTestData.BASIC_AUTH_USERNAME;
7-
import static org.opensearch.dataprepper.plugins.source.otellogs.OtelLogsSourceConfigTestData.CONFIG_GRPC_PATH;
87
import static org.opensearch.dataprepper.plugins.source.otellogs.OtelLogsSourceConfigTestData.CONFIG_HTTP_PATH;
98

109
import java.util.Map;
1110

1211
import org.opensearch.dataprepper.model.configuration.PluginModel;
1312
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
14-
import org.opensearch.dataprepper.plugins.server.RetryInfoConfig;
1513

1614
import com.google.protobuf.InvalidProtocolBufferException;
1715
import com.google.protobuf.util.JsonFormat;
@@ -34,9 +32,7 @@ public static OTelLogsSourceConfig createDefaultConfig() {
3432
public static OTelLogsSourceConfig.OTelLogsSourceConfigBuilder createDefaultConfigBuilder() {
3533
return OTelLogsSourceConfig.builder()
3634
.healthCheck(true)
37-
.path(CONFIG_GRPC_PATH)
3835
.httpPath(CONFIG_HTTP_PATH)
39-
.retryInfo(new RetryInfoConfig())
4036
.port(DEFAULT_PORT)
4137
.enableUnframedRequests(false)
4238
.ssl(false)

e2e-test/log/build.gradle

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ List<LogTestConfiguration> logTestConfigurations = [
4141
'Runs the basic grok end-to-end test.',
4242
'org.opensearch.dataprepper.integration.log.EndToEndBasicLogTest.testPipelineEndToEnd*',
4343
'data-prepper-basic-log',
44-
'basic-grok-e2e-pipeline.yml',
44+
'basic-grok-e2e-otel-logs-source-pipeline.yml',
4545
'data_prepper.yml'
4646
),
4747
new LogTestConfiguration(
@@ -67,7 +67,15 @@ List<LogTestConfiguration> logTestConfigurations = [
6767
'data-prepper-parallel-log',
6868
'parallel-grok-substitute-e2e-pipeline.yml',
6969
'data_prepper.yml'
70-
)
70+
),
71+
new LogTestConfiguration(
72+
'basicLogOtelLogsSourceEndToEndTest',
73+
'Runs the basic grok end-to-end test using the otel logs source.',
74+
'org.opensearch.dataprepper.integration.log.EndToEndOtelLogsSourceTest.testOtelLogsSourcePipelineEndToEnd*',
75+
'data-prepper-otel-logs-source',
76+
'otel-logs-source-pipeline.yml',
77+
'data_prepper.yml'
78+
),
7179
]
7280

7381

@@ -138,7 +146,9 @@ dependencies {
138146
integrationTestImplementation project(':data-prepper-plugins:log-generator-source')
139147
integrationTestImplementation project(':data-prepper-plugins:opensearch')
140148
integrationTestImplementation project(':data-prepper-plugins:aws-plugin-api')
149+
integrationTestImplementation "io.opentelemetry.proto:opentelemetry-proto:${targetOpenTelemetryVersion}"
141150
integrationTestImplementation libs.armeria.core
142151
integrationTestImplementation testLibs.awaitility
143152
integrationTestImplementation libs.opensearch.rhlc
153+
integrationTestImplementation libs.protobuf.util
144154
}
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.integration.log;
7+
8+
import static org.awaitility.Awaitility.await;
9+
import static org.hamcrest.CoreMatchers.is;
10+
import static org.hamcrest.MatcherAssert.assertThat;
11+
import static org.hamcrest.Matchers.contains;
12+
import static org.hamcrest.Matchers.not;
13+
import static org.junit.jupiter.api.Assertions.assertEquals;
14+
15+
import java.io.IOException;
16+
import java.util.ArrayList;
17+
import java.util.Collections;
18+
import java.util.List;
19+
import java.util.Map;
20+
import java.util.concurrent.TimeUnit;
21+
import java.util.stream.Collectors;
22+
23+
import org.junit.Test;
24+
import org.opensearch.action.admin.indices.refresh.RefreshRequest;
25+
import org.opensearch.action.search.SearchRequest;
26+
import org.opensearch.action.search.SearchResponse;
27+
import org.opensearch.client.RequestOptions;
28+
import org.opensearch.client.RestHighLevelClient;
29+
import org.opensearch.dataprepper.plugins.sink.opensearch.ConnectionConfiguration;
30+
import org.opensearch.dataprepper.plugins.source.loggenerator.ApacheLogFaker;
31+
import org.opensearch.search.SearchHits;
32+
import org.opensearch.search.builder.SearchSourceBuilder;
33+
34+
import com.google.protobuf.ByteString;
35+
import com.google.protobuf.InvalidProtocolBufferException;
36+
import com.google.protobuf.util.JsonFormat;
37+
import com.linecorp.armeria.client.WebClient;
38+
import com.linecorp.armeria.common.HttpData;
39+
import com.linecorp.armeria.common.HttpMethod;
40+
import com.linecorp.armeria.common.HttpStatus;
41+
import com.linecorp.armeria.common.MediaType;
42+
import com.linecorp.armeria.common.RequestHeaders;
43+
import com.linecorp.armeria.common.SessionProtocol;
44+
45+
import io.netty.util.AsciiString;
46+
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
47+
import io.opentelemetry.proto.common.v1.AnyValue;
48+
import io.opentelemetry.proto.logs.v1.LogRecord;
49+
import io.opentelemetry.proto.logs.v1.ResourceLogs;
50+
import io.opentelemetry.proto.logs.v1.ScopeLogs;
51+
52+
public class EndToEndOtelLogsSourceTest {
53+
private static final int SOURCE_PORT = 2021;
54+
private static final String INDEX_NAME = "otel-logs-index";
55+
56+
private final ApacheLogFaker apacheLogFaker = new ApacheLogFaker();
57+
58+
@Test
59+
public void testOtelLogsSourcePipelineEndToEnd() throws InvalidProtocolBufferException {
60+
sendHttpRequestToSourceAndAssertResponse(SOURCE_PORT, createOtelLogsRequest());
61+
final RestHighLevelClient restHighLevelClient = prepareOpenSearchRestHighLevelClient();
62+
final List<Map<String, Object>> retrievedDocs = new ArrayList<>();
63+
64+
makeRequestAndAssertResponse(restHighLevelClient, retrievedDocs);
65+
}
66+
67+
private HttpData createOtelLogsRequest() throws InvalidProtocolBufferException {
68+
ExportLogsServiceRequest exportLogsServiceRequest = ExportLogsServiceRequest.newBuilder().addResourceLogs(
69+
ResourceLogs.newBuilder()
70+
.addScopeLogs(ScopeLogs.newBuilder()
71+
.addLogRecords(LogRecord.newBuilder()
72+
.setBody(AnyValue.newBuilder().setStringValue(apacheLogFaker.generateRandomCommonApacheLog()).build())
73+
.setSeverityNumberValue(1)
74+
.setTimeUnixNano(System.currentTimeMillis() * 1_000_000)
75+
.setTraceId(ByteString.copyFromUtf8("trace-id"))
76+
.setSpanId(ByteString.copyFromUtf8("span-id")
77+
)).build()
78+
)
79+
).build();
80+
81+
return HttpData.copyOf(JsonFormat.printer().print(exportLogsServiceRequest).getBytes());
82+
}
83+
84+
private void makeRequestAndAssertResponse(RestHighLevelClient restHighLevelClient, List<Map<String, Object>> retrievedDocs) {
85+
await().atMost(10, TimeUnit.SECONDS).untilAsserted(
86+
() -> {
87+
refreshIndices(restHighLevelClient);
88+
final SearchRequest searchRequest = new SearchRequest(INDEX_NAME);
89+
searchRequest.source(
90+
SearchSourceBuilder.searchSource().size(100)
91+
);
92+
final SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
93+
final List<Map<String, Object>> foundSources = getSourcesFromSearchHits(searchResponse.getHits());
94+
assertEquals(1, foundSources.size());
95+
retrievedDocs.addAll(foundSources);
96+
}
97+
);
98+
}
99+
100+
private RestHighLevelClient prepareOpenSearchRestHighLevelClient() {
101+
final ConnectionConfiguration.Builder builder = new ConnectionConfiguration.Builder(
102+
Collections.singletonList("https://127.0.0.1:9200"));
103+
builder.withUsername("admin");
104+
builder.withPassword("admin");
105+
builder.withInsecure(true);
106+
return builder.build().createClient(null);
107+
}
108+
109+
private void sendHttpRequestToSourceAndAssertResponse(final int port, final HttpData httpData) {
110+
WebClient.of().execute(RequestHeaders.builder()
111+
.scheme(SessionProtocol.HTTP)
112+
.authority(String.format("127.0.0.1:%d", port))
113+
.method(HttpMethod.POST)
114+
.path("/otel-logs-pipeline/logs")
115+
.contentType(MediaType.JSON_UTF_8)
116+
.build(),
117+
httpData)
118+
.aggregate()
119+
.whenComplete((i, ex) -> {
120+
assertThat(i.status(), is(HttpStatus.OK));
121+
final List<String> headerKeys = i.headers()
122+
.stream()
123+
.map(Map.Entry::getKey)
124+
.map(AsciiString::toString)
125+
.collect(Collectors.toList());
126+
assertThat("Response Header Keys", headerKeys, not(contains("server")));
127+
}).join();
128+
}
129+
130+
private List<Map<String, Object>> getSourcesFromSearchHits(final SearchHits searchHits) {
131+
final List<Map<String, Object>> sources = new ArrayList<>();
132+
searchHits.forEach(hit -> {
133+
Map<String, Object> source = hit.getSourceAsMap();
134+
sources.add(source);
135+
});
136+
return sources;
137+
}
138+
139+
private void refreshIndices(final RestHighLevelClient restHighLevelClient) throws IOException {
140+
final RefreshRequest requestAll = new RefreshRequest();
141+
restHighLevelClient.indices().refresh(requestAll, RequestOptions.DEFAULT);
142+
}
143+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
otel-logs-pipeline:
2+
workers: 2
3+
source:
4+
otel_logs_source:
5+
http_path: "/${pipelineName}/logs"
6+
port: 2021
7+
ssl: false
8+
sink:
9+
- opensearch:
10+
hosts: [ "https://node-0.example.com:9200" ]
11+
username: "admin"
12+
password: "admin"
13+
insecure: true
14+
index: "otel-logs-index"
15+
flush_timeout: 5000

0 commit comments

Comments
 (0)