Skip to content

Commit 861369a

Browse files
authored
Fix OTEL trace source broken by PR 5322 (opensearch-project#6494)
* Fix OTEL trace source broken by PR 5322 Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> * Modified HttpService to use appropriate Decoder base on output_format Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> * Addressed review comments Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> --------- Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
1 parent 49ce4f5 commit 861369a

7 files changed

Lines changed: 443 additions & 15 deletions

File tree

data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSource.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@
2626
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
2727
import org.opensearch.dataprepper.plugins.source.oteltrace.certificate.CertificateProviderFactory;
2828
import org.opensearch.dataprepper.plugins.otel.codec.OTelTraceDecoder;
29+
import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec;
30+
import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoOpensearchCodec;
31+
import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoStandardCodec;
32+
import org.opensearch.dataprepper.plugins.otel.codec.OTelOutputFormat;
2933
import org.opensearch.dataprepper.plugins.source.oteltrace.grpc.GrpcService;
3034
import org.opensearch.dataprepper.plugins.source.oteltrace.http.HttpService;
3135
import org.slf4j.Logger;
@@ -88,11 +92,12 @@ public void start(Buffer<Record<Object>> buffer) {
8892
configureTLS(serverBuilder);
8993
configureTaskExecutor(serverBuilder);
9094

91-
configureGrpcService(serverBuilder, buffer);
95+
final OTelProtoCodec.OTelProtoDecoder oTelProtoDecoder = (oTelTraceSourceConfig.getOutputFormat() == OTelOutputFormat.OPENSEARCH) ? new OTelProtoOpensearchCodec.OTelProtoDecoder() : new OTelProtoStandardCodec.OTelProtoDecoder();
96+
configureGrpcService(serverBuilder, oTelProtoDecoder, buffer);
9297

9398
// needed until clarified if unframedRequests should survive
9499
if (!oTelTraceSourceConfig.enableUnframedRequests()) {
95-
configureHttpService(serverBuilder, buffer);
100+
configureHttpService(serverBuilder, oTelProtoDecoder, buffer);
96101
}
97102

98103
server = serverBuilder.build();
@@ -126,8 +131,8 @@ private void handleExecutionException(ExecutionException ex) {
126131
}
127132
}
128133

129-
private void configureGrpcService(ServerBuilder serverBuilder, Buffer<Record<Object>> buffer) {
130-
com.linecorp.armeria.server.grpc.GrpcService grpcService = new GrpcService(pluginFactory, oTelTraceSourceConfig, pluginMetrics, pipelineName).create(buffer, serverBuilder);
134+
private void configureGrpcService(ServerBuilder serverBuilder, final OTelProtoCodec.OTelProtoDecoder otelProtoDecoder, Buffer<Record<Object>> buffer) {
135+
com.linecorp.armeria.server.grpc.GrpcService grpcService = new GrpcService(pluginFactory, otelProtoDecoder, oTelTraceSourceConfig, pluginMetrics, pipelineName).create(buffer, serverBuilder);
131136

132137
if (CompressionOption.NONE.equals(oTelTraceSourceConfig.getCompression())) {
133138
serverBuilder.service(grpcService);
@@ -136,8 +141,8 @@ private void configureGrpcService(ServerBuilder serverBuilder, Buffer<Record<Obj
136141
}
137142
}
138143

139-
private void configureHttpService(ServerBuilder serverBuilder, Buffer<Record<Object>> buffer) {
140-
new HttpService(pluginMetrics, oTelTraceSourceConfig, pluginFactory).create(serverBuilder, buffer);
144+
private void configureHttpService(ServerBuilder serverBuilder, final OTelProtoCodec.OTelProtoDecoder otelProtoDecoder, Buffer<Record<Object>> buffer) {
145+
new HttpService(pluginMetrics, otelProtoDecoder, oTelTraceSourceConfig, pluginFactory).create(serverBuilder, buffer);
141146
}
142147

143148
private void configureHeadersAndHealthCheck(ServerBuilder serverBuilder) {

data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/grpc/GrpcService.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import org.opensearch.dataprepper.model.configuration.PluginSetting;
1515
import org.opensearch.dataprepper.model.plugin.PluginFactory;
1616
import org.opensearch.dataprepper.model.record.Record;
17-
import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoOpensearchCodec;
17+
import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec;
1818
import org.opensearch.dataprepper.plugins.server.HealthGrpcService;
1919
import org.opensearch.dataprepper.plugins.server.RetryInfoConfig;
2020
import org.opensearch.dataprepper.plugins.source.oteltrace.OTelTraceGrpcService;
@@ -45,12 +45,14 @@ public class GrpcService {
4545
public static final String REGEX_HEALTH = "regex:^/(?!health$).*$";
4646

4747
private final OTelTraceSourceConfig oTelTraceSourceConfig;
48+
private final OTelProtoCodec.OTelProtoDecoder otelProtoDecoder;
4849
private final GrpcAuthenticationProvider authenticationProvider;
4950
private final PluginMetrics pluginMetrics;
5051
private final String pipelineName;
5152

52-
public GrpcService(PluginFactory pluginFactory, OTelTraceSourceConfig oTelTraceSourceConfig, PluginMetrics pluginMetrics, String pipelineName) {
53+
public GrpcService(PluginFactory pluginFactory,final OTelProtoCodec.OTelProtoDecoder otelProtoDecoder, final OTelTraceSourceConfig oTelTraceSourceConfig, final PluginMetrics pluginMetrics, final String pipelineName) {
5354
this.oTelTraceSourceConfig = oTelTraceSourceConfig;
55+
this.otelProtoDecoder = otelProtoDecoder;
5456
this.pluginMetrics = pluginMetrics;
5557
this.pipelineName = pipelineName;
5658
this.authenticationProvider = createAuthenticationProvider(pluginFactory, oTelTraceSourceConfig);
@@ -60,7 +62,7 @@ public com.linecorp.armeria.server.grpc.GrpcService create(Buffer<Record<Object>
6062

6163
final OTelTraceGrpcService oTelTraceGrpcService = new OTelTraceGrpcService(
6264
(int)(oTelTraceSourceConfig.getRequestTimeoutInMillis() * 0.8),
63-
new OTelProtoOpensearchCodec.OTelProtoDecoder(),
65+
otelProtoDecoder,
6466
buffer,
6567
pluginMetrics,
6668
null

data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/http/ArmeriaHttpService.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import org.opensearch.dataprepper.model.record.Record;
1515
import org.opensearch.dataprepper.model.trace.Span;
1616
import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec;
17-
import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoOpensearchCodec;
1817
import org.slf4j.Logger;
1918
import org.slf4j.LoggerFactory;
2019

@@ -50,9 +49,9 @@ public class ArmeriaHttpService {
5049
private final DistributionSummary payloadSizeSummary;
5150
private final Timer requestProcessDuration;
5251

53-
public ArmeriaHttpService(Buffer<Record<Object>> buffer, final PluginMetrics pluginMetrics, final int bufferWriteTimeoutInMillis) {
52+
public ArmeriaHttpService(Buffer<Record<Object>> buffer, final OTelProtoCodec.OTelProtoDecoder otelProtoDecoder, final PluginMetrics pluginMetrics, final int bufferWriteTimeoutInMillis) {
5453
this.buffer = buffer;
55-
this.oTelProtoDecoder = new OTelProtoOpensearchCodec.OTelProtoDecoder();
54+
this.oTelProtoDecoder = otelProtoDecoder;
5655
this.bufferWriteTimeoutInMillis = bufferWriteTimeoutInMillis;
5756

5857
requestsReceivedCounter = pluginMetrics.counter(REQUESTS_RECEIVED);

data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/http/HttpService.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.opensearch.dataprepper.model.record.Record;
1515
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
1616
import org.opensearch.dataprepper.plugins.server.RetryInfoConfig;
17+
import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec;
1718
import org.opensearch.dataprepper.plugins.source.oteltrace.OTelTraceSourceConfig;
1819
import org.slf4j.Logger;
1920
import org.slf4j.LoggerFactory;
@@ -27,9 +28,11 @@ public class HttpService {
2728

2829
private final PluginMetrics pluginMetrics;
2930
private final OTelTraceSourceConfig oTelTraceSourceConfig;
31+
private final OTelProtoCodec.OTelProtoDecoder otelProtoDecoder;
3032
private final PluginFactory pluginFactory;
3133

32-
public HttpService(PluginMetrics pluginMetrics, OTelTraceSourceConfig oTelTraceSourceConfig, PluginFactory pluginFactory) {
34+
public HttpService(PluginMetrics pluginMetrics, final OTelProtoCodec.OTelProtoDecoder otelProtoDecoder, final OTelTraceSourceConfig oTelTraceSourceConfig, final PluginFactory pluginFactory) {
35+
this.otelProtoDecoder = otelProtoDecoder;
3336
this.pluginMetrics = pluginMetrics;
3437
this.oTelTraceSourceConfig = oTelTraceSourceConfig;
3538
this.pluginFactory = pluginFactory;
@@ -39,7 +42,7 @@ public ArmeriaHttpService create(ServerBuilder serverBuilder, Buffer<Record<Obje
3942
RetryInfoConfig retryInfo = oTelTraceSourceConfig.getRetryInfo() != null
4043
? oTelTraceSourceConfig.getRetryInfo()
4144
: DEFAULT_RETRY_INFO;
42-
ArmeriaHttpService httpService = new ArmeriaHttpService(buffer, pluginMetrics, oTelTraceSourceConfig.getRequestTimeoutInMillis());
45+
ArmeriaHttpService httpService = new ArmeriaHttpService(buffer, otelProtoDecoder, pluginMetrics, oTelTraceSourceConfig.getRequestTimeoutInMillis());
4346
HttpExceptionHandler httpExceptionHandler = new HttpExceptionHandler(pluginMetrics, retryInfo.getMinDelay(), retryInfo.getMaxDelay());
4447

4548
configureAuthentication(serverBuilder);

data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceTest.java

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@
3636
import io.opentelemetry.proto.trace.v1.ResourceSpans;
3737
import io.opentelemetry.proto.trace.v1.ScopeSpans;
3838
import io.opentelemetry.proto.trace.v1.Span;
39+
import io.opentelemetry.proto.common.v1.AnyValue;
40+
import io.opentelemetry.proto.common.v1.KeyValue;
41+
import io.opentelemetry.proto.resource.v1.Resource;
42+
import org.opensearch.dataprepper.model.record.Record;
3943
import org.apache.commons.io.IOUtils;
4044
import org.junit.jupiter.api.AfterEach;
4145
import org.junit.jupiter.api.BeforeEach;
@@ -65,6 +69,7 @@
6569
import org.opensearch.dataprepper.plugins.HttpBasicArmeriaHttpAuthenticationProvider;
6670
import org.opensearch.dataprepper.plugins.certificate.CertificateProvider;
6771
import org.opensearch.dataprepper.plugins.certificate.model.Certificate;
72+
import org.opensearch.dataprepper.plugins.otel.codec.OTelOutputFormat;
6873
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
6974
import org.opensearch.dataprepper.plugins.server.HealthGrpcService;
7075
import org.opensearch.dataprepper.plugins.server.RetryInfoConfig;
@@ -77,6 +82,7 @@
7782
import java.nio.file.Path;
7883
import java.time.Duration;
7984
import java.util.Base64;
85+
import java.util.Collection;
8086
import java.util.Collections;
8187
import java.util.HashMap;
8288
import java.util.List;
@@ -103,6 +109,7 @@
103109
import static org.mockito.ArgumentMatchers.anyInt;
104110
import static org.mockito.ArgumentMatchers.eq;
105111
import static org.mockito.ArgumentMatchers.isA;
112+
import static org.mockito.Mockito.doAnswer;
106113
import static org.mockito.Mockito.lenient;
107114
import static org.mockito.Mockito.mock;
108115
import static org.mockito.Mockito.never;
@@ -119,6 +126,9 @@ class OTelTraceSourceTest {
119126
private static final String USERNAME = "test_user";
120127
private static final String PASSWORD = "test_password";
121128
private static final String TEST_PATH = "${pipelineName}/v1/traces";
129+
private static final String RESOURCE_ATTR_SERVICE_KEY = "service.name";
130+
private static final String TRACE_SERVICE_NAME = "TestTraceServiceName";
131+
private static final int TEST_RESOURCE_DROPPED_ATTRIBUTES_COUNT = 11;
122132
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().registerModule(new JavaTimeModule());
123133
private static final String TEST_PIPELINE_NAME = "test_pipeline";
124134
private static final RetryInfoConfig TEST_RETRY_INFO = new RetryInfoConfig(Duration.ofMillis(50), Duration.ofMillis(2000));
@@ -169,6 +179,7 @@ class OTelTraceSourceTest {
169179
private PluginMetrics pluginMetrics;
170180
private PipelineDescription pipelineDescription;
171181
private OTelTraceSource SOURCE;
182+
private List<Record<org.opensearch.dataprepper.model.trace.Span>> recordsReceived;
172183

173184
@BeforeEach
174185
void beforeEach() {
@@ -222,7 +233,54 @@ private void configureObjectUnderTest() {
222233
}
223234

224235
@Test
225-
void testHttpFullJsonWithCustomPathAndAuthHeader_with_successful_response() throws InvalidProtocolBufferException {
236+
void testHttpFullJsonWithCustomPathAndAuthHeader_with_successful_response() throws Exception {
237+
doAnswer((a)-> {
238+
recordsReceived = ((Collection<Record<org.opensearch.dataprepper.model.trace.Span>>)a.getArgument(0)).stream().collect(Collectors.toList());
239+
return null;
240+
}).when(buffer).writeAll(any(), any(Integer.class));
241+
when(httpBasicAuthenticationConfig.getUsername()).thenReturn(USERNAME);
242+
when(httpBasicAuthenticationConfig.getPassword()).thenReturn(PASSWORD);
243+
final GrpcAuthenticationProvider grpcAuthenticationProvider = new GrpcBasicAuthenticationProvider(httpBasicAuthenticationConfig);
244+
245+
when(pluginFactory.loadPlugin(eq(GrpcAuthenticationProvider.class), any(PluginSetting.class)))
246+
.thenReturn(grpcAuthenticationProvider);
247+
when(oTelTraceSourceConfig.getAuthentication()).thenReturn(new PluginModel("http_basic",
248+
Map.of(
249+
"username", USERNAME,
250+
"password", PASSWORD
251+
)));
252+
when(oTelTraceSourceConfig.enableUnframedRequests()).thenReturn(true);
253+
when(oTelTraceSourceConfig.getPath()).thenReturn(TEST_PATH);
254+
when(oTelTraceSourceConfig.getOutputFormat()).thenReturn(OTelOutputFormat.OPENSEARCH);
255+
256+
configureObjectUnderTest();
257+
SOURCE.start(buffer);
258+
259+
final String encodeToString = Base64.getEncoder()
260+
.encodeToString(String.format("%s:%s", USERNAME, PASSWORD).getBytes(StandardCharsets.UTF_8));
261+
262+
final String transformedPath = "/" + TEST_PIPELINE_NAME + "/v1/traces";
263+
264+
WebClient.of().prepare()
265+
.post("http://127.0.0.1:21890" + transformedPath)
266+
.content(MediaType.JSON_UTF_8, JsonFormat.printer().print(createExportTraceRequest()).getBytes())
267+
.header("Authorization", "Basic " + encodeToString)
268+
.execute()
269+
.aggregate()
270+
.whenComplete((response, throwable) -> assertSecureResponseWithStatusCode(response, HttpStatus.OK, throwable))
271+
.join();
272+
273+
assertThat(recordsReceived.size(), equalTo(1));
274+
org.opensearch.dataprepper.model.trace.Span span = recordsReceived.get(0).getData();
275+
assertThat(span.get("attributes/resource.attributes.service@name", String.class), equalTo(TRACE_SERVICE_NAME));
276+
}
277+
278+
@Test
279+
void testHttpFullJsonWithCustomPathAndAuthHeader_using_otel_format_with_successful_response() throws Exception {
280+
doAnswer((a)-> {
281+
recordsReceived = ((Collection<Record<org.opensearch.dataprepper.model.trace.Span>>)a.getArgument(0)).stream().collect(Collectors.toList());
282+
return null;
283+
}).when(buffer).writeAll(any(), any(Integer.class));
226284
when(httpBasicAuthenticationConfig.getUsername()).thenReturn(USERNAME);
227285
when(httpBasicAuthenticationConfig.getPassword()).thenReturn(PASSWORD);
228286
final GrpcAuthenticationProvider grpcAuthenticationProvider = new GrpcBasicAuthenticationProvider(httpBasicAuthenticationConfig);
@@ -236,6 +294,7 @@ void testHttpFullJsonWithCustomPathAndAuthHeader_with_successful_response() thro
236294
)));
237295
when(oTelTraceSourceConfig.enableUnframedRequests()).thenReturn(true);
238296
when(oTelTraceSourceConfig.getPath()).thenReturn(TEST_PATH);
297+
when(oTelTraceSourceConfig.getOutputFormat()).thenReturn(OTelOutputFormat.OTEL);
239298

240299
configureObjectUnderTest();
241300
SOURCE.start(buffer);
@@ -253,6 +312,10 @@ void testHttpFullJsonWithCustomPathAndAuthHeader_with_successful_response() thro
253312
.aggregate()
254313
.whenComplete((response, throwable) -> assertSecureResponseWithStatusCode(response, HttpStatus.OK, throwable))
255314
.join();
315+
316+
assertThat(recordsReceived.size(), equalTo(1));
317+
org.opensearch.dataprepper.model.trace.Span span = recordsReceived.get(0).getData();
318+
assertThat(span.get("resource/attributes/service.name", String.class), equalTo(TRACE_SERVICE_NAME));
256319
}
257320

258321
@Test
@@ -792,8 +855,17 @@ private ExportTraceServiceRequest createExportTraceRequest() {
792855
.setEndTimeUnixNano(101)
793856
.setTraceState("SUCCESS").build();
794857

858+
final Resource resource = Resource.newBuilder()
859+
.setDroppedAttributesCount(TEST_RESOURCE_DROPPED_ATTRIBUTES_COUNT)
860+
.addAttributes(KeyValue.newBuilder()
861+
.setKey(RESOURCE_ATTR_SERVICE_KEY)
862+
.setValue(AnyValue.newBuilder().setStringValue(TRACE_SERVICE_NAME).build())
863+
)
864+
.build();
865+
795866
return ExportTraceServiceRequest.newBuilder()
796867
.addResourceSpans(ResourceSpans.newBuilder()
868+
.setResource(resource)
797869
.addScopeSpans(ScopeSpans.newBuilder().addSpans(testSpan)).build())
798870
.build();
799871
}

0 commit comments

Comments
 (0)