Skip to content

Commit 148ee32

Browse files
committed
Add e2e test for protobuf requests
Signed-off-by: Tomas Longo <tlongo@sternad.de>
1 parent 75a5ef7 commit 148ee32

3 files changed

Lines changed: 41 additions & 7 deletions

File tree

data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/http/ArmeriaHttpService.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717
import org.slf4j.LoggerFactory;
1818

1919
import com.linecorp.armeria.server.ServiceRequestContext;
20-
import com.linecorp.armeria.server.annotation.Consumes;
20+
import com.linecorp.armeria.server.annotation.ConsumesJson;
21+
import com.linecorp.armeria.server.annotation.ConsumesProtobuf;
2122
import com.linecorp.armeria.server.annotation.Post;
2223

2324
import io.micrometer.core.instrument.Counter;
@@ -57,7 +58,8 @@ public ArmeriaHttpService(Buffer<Record<Object>> buffer, final PluginMetrics plu
5758

5859
// no path provided. Will be set by config.
5960
@Post("")
60-
@Consumes(value = "application/json")
61+
@ConsumesJson
62+
@ConsumesProtobuf
6163
public ExportLogsServiceResponse exportLog(ExportLogsServiceRequest request) {
6264
requestsReceivedCounter.increment();
6365
payloadSizeSummary.record(request.getSerializedSize());

e2e-test/log/build.gradle

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,14 @@ List<LogTestConfiguration> logTestConfigurations = [
9292
'otel-logs-source-unframed-requests-pipeline.yml',
9393
'data_prepper.yml'
9494
),
95+
new LogTestConfiguration(
96+
'basicLogOtelLogsSourceProtobufPayloadEndToEndTest',
97+
'Runs the basic end-to-end test using the otel logs source grpc service with unframed requests enabled.',
98+
'org.opensearch.dataprepper.integration.log.EndToEndOtelLogsSourceTest.testOtelLogsSourcePipelineWithProtobufPayloadEndToEnd*',
99+
'data-prepper-otel-logs-source',
100+
'otel-logs-source-pipeline.yml',
101+
'data_prepper.yml'
102+
),
95103
]
96104

97105

e2e-test/log/src/integrationTest/java/org/opensearch/dataprepper/integration/log/EndToEndOtelLogsSourceTest.java

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,20 +56,27 @@ public class EndToEndOtelLogsSourceTest {
5656

5757
@Test
5858
public void testOtelLogsSourcePipelineEndToEnd() throws InvalidProtocolBufferException {
59-
ingestLogs("/otel-logs-pipeline/logs");
59+
ingestLogs("/otel-logs-pipeline/logs", createOtelLogsJsonRequest());
6060

6161
searchForLogsAndAssert();
6262
}
6363

6464
@Test
6565
public void testOtelLogsSourceWithUnframedRequestsPipelineEndToEnd() throws InvalidProtocolBufferException {
66-
ingestLogs("/opentelemetry.proto.collector.logs.v1.LogsService/Export");
66+
ingestLogs("/opentelemetry.proto.collector.logs.v1.LogsService/Export", createOtelLogsJsonRequest());
67+
68+
searchForLogsAndAssert();
69+
}
70+
71+
@Test
72+
public void testOtelLogsSourcePipelineWithProtobufPayloadEndToEnd() throws InvalidProtocolBufferException {
73+
ingestLogs("/otel-logs-pipeline/logs", createOtelLogsProtobufRequest());
6774

6875
searchForLogsAndAssert();
6976
}
7077

7178

72-
private HttpData createOtelLogsHttpRequest() throws InvalidProtocolBufferException {
79+
private HttpData createOtelLogsJsonRequest() throws InvalidProtocolBufferException {
7380
ExportLogsServiceRequest exportLogsServiceRequest = ExportLogsServiceRequest.newBuilder().addResourceLogs(
7481
ResourceLogs.newBuilder()
7582
.addScopeLogs(ScopeLogs.newBuilder()
@@ -86,6 +93,23 @@ private HttpData createOtelLogsHttpRequest() throws InvalidProtocolBufferExcepti
8693
return HttpData.copyOf(JsonFormat.printer().print(exportLogsServiceRequest).getBytes());
8794
}
8895

96+
private HttpData createOtelLogsProtobufRequest() throws InvalidProtocolBufferException {
97+
ExportLogsServiceRequest exportLogsServiceRequest = ExportLogsServiceRequest.newBuilder().addResourceLogs(
98+
ResourceLogs.newBuilder()
99+
.addScopeLogs(ScopeLogs.newBuilder()
100+
.addLogRecords(LogRecord.newBuilder()
101+
.setBody(AnyValue.newBuilder().setStringValue(apacheLogFaker.generateRandomCommonApacheLog()).build())
102+
.setSeverityNumberValue(1)
103+
.setTimeUnixNano(System.currentTimeMillis() * 1_000_000)
104+
.setTraceId(ByteString.copyFromUtf8("trace-id"))
105+
.setSpanId(ByteString.copyFromUtf8("span-id")
106+
)).build()
107+
)
108+
).build();
109+
110+
return HttpData.copyOf(exportLogsServiceRequest.toByteArray());
111+
}
112+
89113
private void searchForLogsAndAssert() {
90114
await().atMost(10, TimeUnit.SECONDS).untilAsserted(
91115
() -> {
@@ -109,7 +133,7 @@ private RestHighLevelClient createOpenSearchClient() {
109133
.createClient(null);
110134
}
111135

112-
private void ingestLogs(String path) throws InvalidProtocolBufferException {
136+
private void ingestLogs(String path, HttpData payload) throws InvalidProtocolBufferException {
113137
RequestHeaders headers = RequestHeaders.builder()
114138
.scheme(SessionProtocol.HTTP)
115139
.authority(String.format("127.0.0.1:%d", SOURCE_PORT))
@@ -118,7 +142,7 @@ private void ingestLogs(String path) throws InvalidProtocolBufferException {
118142
.contentType(MediaType.JSON_UTF_8)
119143
.build();
120144

121-
WebClient.of().execute(headers, createOtelLogsHttpRequest())
145+
WebClient.of().execute(headers, payload)
122146
.aggregate()
123147
.whenComplete((i, ex) -> assertThat(i.status(), is(HttpStatus.OK)))
124148
.join();

0 commit comments

Comments
 (0)