Skip to content

Commit 83d7b39

Browse files
committed
Add test for unframed requests
Signed-off-by: Tomas Longo <tlongo@sternad.de>
1 parent e9ebc47 commit 83d7b39

1 file changed

Lines changed: 182 additions & 0 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.plugins.source.otellogs;
7+
8+
import static com.linecorp.armeria.common.HttpStatus.OK;
9+
import static com.linecorp.armeria.common.HttpStatus.UNSUPPORTED_MEDIA_TYPE;
10+
import static org.hamcrest.MatcherAssert.assertThat;
11+
import static org.hamcrest.Matchers.equalTo;
12+
import static org.hamcrest.Matchers.hasItem;
13+
import static org.hamcrest.Matchers.is;
14+
import static org.hamcrest.Matchers.not;
15+
import static org.hamcrest.Matchers.nullValue;
16+
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
17+
import static org.mockito.ArgumentMatchers.any;
18+
import static org.mockito.ArgumentMatchers.anyBoolean;
19+
import static org.mockito.ArgumentMatchers.anyInt;
20+
import static org.mockito.ArgumentMatchers.eq;
21+
import static org.mockito.Mockito.lenient;
22+
import static org.mockito.Mockito.mock;
23+
import static org.mockito.Mockito.when;
24+
import static org.opensearch.dataprepper.plugins.source.otellogs.OtelLogsSourceConfigFixture.createDefaultConfigBuilder;
25+
import static org.opensearch.dataprepper.plugins.source.otellogs.OtelLogsSourceConfigTestData.CONFIG_GRPC_PATH;
26+
import static org.opensearch.dataprepper.plugins.source.otellogs.OtelLogsSourceConfigTestData.CONFIG_HTTP_PATH;
27+
28+
import java.util.List;
29+
import java.util.Map;
30+
import java.util.stream.Collectors;
31+
32+
import org.junit.jupiter.api.AfterEach;
33+
import org.junit.jupiter.api.BeforeEach;
34+
import org.junit.jupiter.api.Test;
35+
import org.junit.jupiter.api.extension.ExtendWith;
36+
import org.mockito.Mock;
37+
import org.mockito.junit.jupiter.MockitoExtension;
38+
import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider;
39+
import org.opensearch.dataprepper.metrics.MetricsTestUtil;
40+
import org.opensearch.dataprepper.metrics.PluginMetrics;
41+
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
42+
import org.opensearch.dataprepper.model.configuration.PluginSetting;
43+
import org.opensearch.dataprepper.model.plugin.PluginFactory;
44+
import org.opensearch.dataprepper.model.record.Record;
45+
import org.opensearch.dataprepper.plugins.GrpcBasicAuthenticationProvider;
46+
import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer;
47+
import org.opensearch.dataprepper.plugins.otel.codec.OTelLogsDecoder;
48+
49+
import com.google.protobuf.InvalidProtocolBufferException;
50+
import com.google.protobuf.util.JsonFormat;
51+
import com.linecorp.armeria.client.WebClient;
52+
import com.linecorp.armeria.common.AggregatedHttpResponse;
53+
import com.linecorp.armeria.common.HttpData;
54+
import com.linecorp.armeria.common.HttpMethod;
55+
import com.linecorp.armeria.common.HttpStatus;
56+
import com.linecorp.armeria.common.MediaType;
57+
import com.linecorp.armeria.common.RequestHeaders;
58+
import com.linecorp.armeria.common.RequestHeadersBuilder;
59+
import com.linecorp.armeria.common.SessionProtocol;
60+
import com.linecorp.armeria.server.Server;
61+
import com.linecorp.armeria.server.ServerBuilder;
62+
import com.linecorp.armeria.server.grpc.GrpcService;
63+
import com.linecorp.armeria.server.grpc.GrpcServiceBuilder;
64+
65+
import io.grpc.BindableService;
66+
import io.netty.util.AsciiString;
67+
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
68+
import io.opentelemetry.proto.logs.v1.ResourceLogs;
69+
70+
@ExtendWith(MockitoExtension.class)
71+
class OTelLogsSourceUnframedRequestTestTest {
72+
private static final String TEST_PIPELINE_NAME = "test_pipeline";
73+
74+
@Mock
75+
private ServerBuilder serverBuilder;
76+
77+
@Mock
78+
private Server server;
79+
80+
@Mock
81+
private GrpcServiceBuilder grpcServiceBuilder;
82+
83+
@Mock
84+
private GrpcService grpcService;
85+
86+
@Mock
87+
private PluginFactory pluginFactory;
88+
89+
@Mock
90+
private GrpcBasicAuthenticationProvider authenticationProvider;
91+
92+
@Mock
93+
private BlockingBuffer<Record<Object>> buffer;
94+
95+
private PluginMetrics pluginMetrics;
96+
private PipelineDescription pipelineDescription;
97+
private OTelLogsSource SOURCE;
98+
private static final ExportLogsServiceRequest LOGS_REQUEST = ExportLogsServiceRequest.newBuilder()
99+
.addResourceLogs(ResourceLogs.newBuilder().build()).build();
100+
101+
@BeforeEach
102+
public void beforeEach() {
103+
lenient().when(serverBuilder.service(any(GrpcService.class))).thenReturn(serverBuilder);
104+
lenient().when(serverBuilder.https(anyInt())).thenReturn(serverBuilder);
105+
lenient().when(serverBuilder.build()).thenReturn(server);
106+
107+
lenient().when(grpcServiceBuilder.addService(any(BindableService.class))).thenReturn(grpcServiceBuilder);
108+
lenient().when(grpcServiceBuilder.useClientTimeoutHeader(anyBoolean())).thenReturn(grpcServiceBuilder);
109+
lenient().when(grpcServiceBuilder.useBlockingTaskExecutor(anyBoolean())).thenReturn(grpcServiceBuilder);
110+
lenient().when(grpcServiceBuilder.build()).thenReturn(grpcService);
111+
112+
MetricsTestUtil.initMetrics();
113+
pluginMetrics = PluginMetrics.fromNames("otel_logs", "pipeline");
114+
115+
lenient().when(pluginFactory.loadPlugin(eq(GrpcAuthenticationProvider.class), any(PluginSetting.class))).thenReturn(authenticationProvider);
116+
pipelineDescription = mock(PipelineDescription.class);
117+
when(pipelineDescription.getPipelineName()).thenReturn(TEST_PIPELINE_NAME);
118+
}
119+
120+
@AfterEach
121+
public void afterEach() {
122+
SOURCE.stop();
123+
}
124+
125+
private void configureSource(OTelLogsSourceConfig config) {
126+
SOURCE = new OTelLogsSource(config, pluginMetrics, pluginFactory, pipelineDescription);
127+
assertInstanceOf(OTelLogsDecoder.class, SOURCE.getDecoder());
128+
}
129+
130+
private RequestHeadersBuilder getDefaultRequestHeadersBuilder() {
131+
return RequestHeaders.builder()
132+
.scheme(SessionProtocol.HTTP)
133+
.authority("127.0.0.1:21892")
134+
.method(HttpMethod.POST)
135+
.path(CONFIG_HTTP_PATH)
136+
.contentType(MediaType.JSON_UTF_8);
137+
}
138+
139+
@Test
140+
void unframedRequests_unframedRequestsAreEnabledAndHttpRequestIsSentToGrpcEndpoint_returns200() throws InvalidProtocolBufferException {
141+
configureSource(createDefaultConfigBuilder().enableUnframedRequests(true).build());
142+
SOURCE.start(buffer);
143+
144+
WebClient.of().execute(getDefaultRequestHeadersBuilder()
145+
.path(CONFIG_GRPC_PATH)
146+
.build(),
147+
HttpData.copyOf(JsonFormat.printer().print(LOGS_REQUEST).getBytes()))
148+
.aggregate()
149+
.whenComplete((response, throwable) -> assertSecureResponseWithStatusCode(response, OK, throwable))
150+
.join();
151+
}
152+
153+
@Test
154+
void unframedRequests_unframeRequestsAreDisabledAndHttpRequestIsSentToGrpcEndpoint_returns415() throws InvalidProtocolBufferException {
155+
configureSource(createDefaultConfigBuilder().enableUnframedRequests(false).build());
156+
SOURCE.start(buffer);
157+
158+
WebClient.of().execute(getDefaultRequestHeadersBuilder()
159+
.path(CONFIG_GRPC_PATH)
160+
.build(),
161+
HttpData.copyOf(JsonFormat.printer().print(LOGS_REQUEST).getBytes()))
162+
.aggregate()
163+
.whenComplete((response, throwable) -> assertSecureResponseWithStatusCode(response, UNSUPPORTED_MEDIA_TYPE, throwable))
164+
.join();
165+
}
166+
167+
private void assertSecureResponseWithStatusCode(final AggregatedHttpResponse response,
168+
final HttpStatus expectedStatus,
169+
final Throwable throwable) {
170+
assertThat("Http Status", response.status(), equalTo(expectedStatus));
171+
assertThat("Http Response Throwable", throwable, is(nullValue()));
172+
173+
final List<String> headerKeys = response.headers()
174+
.stream()
175+
.map(Map.Entry::getKey)
176+
.map(AsciiString::toString)
177+
.map(String::toLowerCase)
178+
.collect(Collectors.toList());
179+
assertThat("Response Header Keys", headerKeys, not(hasItem("server")));
180+
}
181+
182+
}

0 commit comments

Comments
 (0)