Skip to content

Commit bf397f3

Browse files
TomasLongoTomas Longo
andauthored
Introduce otlp/http support in OTelTraceSource (#5322)
* Update OpenTelemetryProto to 1.3.2-alpha and refactor scope usage Signed-off-by: Tomas Longo <tomas.longo@sap.com> * [WIP] Process ExportTraceServiceRequest in http service Signed-off-by: Tomas Longo <tomas.longo@sap.com> * [WIP] Move gRPC request tests to own class Signed-off-by: Tomas Longo <tomas.longo@sap.com> * [WIP] Cleanup Signed-off-by: Tomas Longo <tomas.longo@sap.com> * [WIP] Separate concerns when it comes to configuring the server/services Signed-off-by: Tomas Longo <tomas.longo@sap.com> * [WIP] Use retry calculator to provide backoff info Signed-off-by: Tomas Longo <tomas.longo@sap.com> * [WIP] Add metrics to http exception handler Signed-off-by: Tomas Longo <tomas.longo@sap.com> * [WIP] Revert accidental changes Signed-off-by: Tomas Longo <tomas.longo@sap.com> * [WIP] Infer protocol from config. Isolate tests regarding unframed requests Signed-off-by: Tomas Longo <tomas.longo@sap.com> * [WIP] Add basic auth to http service Signed-off-by: Tomas Longo <tomas.longo@sap.com> * [WIP] Move configuration of http service into own class Signed-off-by: Tomas Longo <tomas.longo@sap.com> * [WIP] Add pr description Signed-off-by: Tomas Longo <tomas.longo@sap.com> * [WIP] Remove pr description Signed-off-by: Tomas Longo <tomas.longo@sap.com> * [WIP] Fix checkstyle findings Signed-off-by: Tomas Longo <tomas.longo@sap.com> * [WIP] Fix issue with http service being enabled, while grpc service accepts unframed requests Signed-off-by: Tomas Longo <tomas.longo@sap.com> * Refactor EndToEndRawSpanTest Signed-off-by: Tomas Longo <tomas.longo@sap.com> * Create ArmeriaAuthenticationProvider via PluginFactory Signed-off-by: Tomas Longo <tomas.longo@sap.com> * Rename GrpcRetryInfoCalculator Signed-off-by: Tomas Longo <tomas.longo@sap.com> * Create test for invalid payload Signed-off-by: Tomas Longo <tomas.longo@sap.com> * Add test for healthcheck Signed-off-by: Tomas Longo <tomas.longo@sap.com> * Add test for http exception handler Signed-off-by: Tomas Longo <tomas.longo@sap.com> * Remove tests Signed-off-by: Tomas Longo <tomas.longo@sap.com> * Fix missing imports Signed-off-by: Tomas Longo <tlongo@sternad.de> * Remove unused imports Signed-off-by: Tomas Longo <tlongo@sternad.de> * Fix imports Signed-off-by: Tomas Longo <tlongo@sternad.de> * Remove/edit todos Signed-off-by: Tomas Longo <tlongo@sternad.de> * Remove accidentally added default password Signed-off-by: Tomas Longo <tlongo@sternad.de> * Declare assertJ as test lib and reference it from e2e test Signed-off-by: Tomas Longo <tlongo@sternad.de> * Fix merge error Signed-off-by: Tomas Longo <tlongo@sternad.de> * Use insecure ssl connection in test Signed-off-by: Tomas Longo <tlongo@sternad.de> --------- Signed-off-by: Tomas Longo <tomas.longo@sap.com> Signed-off-by: Tomas Longo <tlongo@sternad.de> Co-authored-by: Tomas Longo <tomas.longo@sap.com>
1 parent ddd0ce0 commit bf397f3

19 files changed

Lines changed: 2037 additions & 621 deletions

File tree

data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/GrpcRequestExceptionHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,14 @@ public class GrpcRequestExceptionHandler implements GoogleGrpcExceptionHandlerFu
3939
private final Counter badRequestsCounter;
4040
private final Counter requestsTooLargeCounter;
4141
private final Counter internalServerErrorCounter;
42-
private final GrpcRetryInfoCalculator retryInfoCalculator;
42+
private final RetryInfoCalculator retryInfoCalculator;
4343

4444
public GrpcRequestExceptionHandler(final PluginMetrics pluginMetrics, Duration retryInfoMinDelay, Duration retryInfoMaxDelay) {
4545
requestTimeoutsCounter = pluginMetrics.counter(REQUEST_TIMEOUTS);
4646
badRequestsCounter = pluginMetrics.counter(BAD_REQUESTS);
4747
requestsTooLargeCounter = pluginMetrics.counter(REQUESTS_TOO_LARGE);
4848
internalServerErrorCounter = pluginMetrics.counter(INTERNAL_SERVER_ERROR);
49-
retryInfoCalculator = new GrpcRetryInfoCalculator(retryInfoMinDelay, retryInfoMaxDelay);
49+
retryInfoCalculator = new RetryInfoCalculator(retryInfoMinDelay, retryInfoMaxDelay);
5050
}
5151

5252
@Override

data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/GrpcRetryInfoCalculator.java renamed to data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/RetryInfoCalculator.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,15 @@
66
import java.time.Instant;
77
import java.util.concurrent.atomic.AtomicReference;
88

9-
class GrpcRetryInfoCalculator {
9+
public class RetryInfoCalculator {
1010

1111
private final Duration minimumDelay;
1212
private final Duration maximumDelay;
1313

1414
private final AtomicReference<Instant> lastTimeCalled;
1515
private final AtomicReference<Duration> nextDelay;
1616

17-
GrpcRetryInfoCalculator(Duration minimumDelay, Duration maximumDelay) {
17+
public RetryInfoCalculator(Duration minimumDelay, Duration maximumDelay) {
1818
this.minimumDelay = minimumDelay;
1919
this.maximumDelay = maximumDelay;
2020
// Create a cushion so that the calculator treats a first quick exception (after prepper startup) as normal request (e.g. does not calculate a backoff)
@@ -34,7 +34,7 @@ private static com.google.protobuf.Duration.Builder mapDuration(Duration duratio
3434
return com.google.protobuf.Duration.newBuilder().setSeconds(duration.getSeconds()).setNanos(duration.getNano());
3535
}
3636

37-
RetryInfo createRetryInfo() {
37+
public RetryInfo createRetryInfo() {
3838
Instant now = Instant.now();
3939
// Is the last time we got called longer ago than the next delay?
4040
if (lastTimeCalled.getAndSet(now).isBefore(now.minus(nextDelay.get()))) {

data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/GrpcRetryInfoCalculatorTest.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,16 @@ public class GrpcRetryInfoCalculatorTest {
1212

1313
@Test
1414
public void testMinimumDelayOnFirstCall() {
15-
RetryInfo retryInfo = new GrpcRetryInfoCalculator(Duration.ofMillis(100), Duration.ofSeconds(1)).createRetryInfo();
15+
RetryInfo retryInfo = new RetryInfoCalculator(Duration.ofMillis(100), Duration.ofSeconds(1)).createRetryInfo();
1616

1717
assertThat(retryInfo.getRetryDelay().getNanos(), equalTo(100_000_000));
1818
assertThat(retryInfo.getRetryDelay().getSeconds(), equalTo(0L));
1919
}
2020

2121
@Test
2222
public void testExponentialBackoff() {
23-
GrpcRetryInfoCalculator calculator =
24-
new GrpcRetryInfoCalculator(Duration.ofSeconds(1), Duration.ofSeconds(10));
23+
RetryInfoCalculator calculator =
24+
new RetryInfoCalculator(Duration.ofSeconds(1), Duration.ofSeconds(10));
2525
RetryInfo retryInfo1 = calculator.createRetryInfo();
2626
RetryInfo retryInfo2 = calculator.createRetryInfo();
2727
RetryInfo retryInfo3 = calculator.createRetryInfo();
@@ -35,8 +35,8 @@ public void testExponentialBackoff() {
3535

3636
@Test
3737
public void testUsesMaximumAsLongestDelay() {
38-
GrpcRetryInfoCalculator calculator =
39-
new GrpcRetryInfoCalculator(Duration.ofSeconds(1), Duration.ofSeconds(2));
38+
RetryInfoCalculator calculator =
39+
new RetryInfoCalculator(Duration.ofSeconds(1), Duration.ofSeconds(2));
4040
RetryInfo retryInfo1 = calculator.createRetryInfo();
4141
RetryInfo retryInfo2 = calculator.createRetryInfo();
4242
RetryInfo retryInfo3 = calculator.createRetryInfo();
@@ -49,8 +49,8 @@ public void testUsesMaximumAsLongestDelay() {
4949
@Test
5050
public void testResetAfterDelayWearsOff() throws InterruptedException {
5151
int minDelayNanos = 1_000_000;
52-
GrpcRetryInfoCalculator calculator =
53-
new GrpcRetryInfoCalculator(Duration.ofNanos(minDelayNanos), Duration.ofSeconds(1));
52+
RetryInfoCalculator calculator =
53+
new RetryInfoCalculator(Duration.ofNanos(minDelayNanos), Duration.ofSeconds(1));
5454

5555
RetryInfo retryInfo1 = calculator.createRetryInfo();
5656
RetryInfo retryInfo2 = calculator.createRetryInfo();
@@ -66,8 +66,8 @@ public void testResetAfterDelayWearsOff() throws InterruptedException {
6666

6767
@Test
6868
public void testQuickFirstExceptionDoesNotTriggerBackoffCalculationEvenWithLongMinDelay() throws InterruptedException {
69-
GrpcRetryInfoCalculator calculator =
70-
new GrpcRetryInfoCalculator(Duration.ofSeconds(10), Duration.ofSeconds(20));
69+
RetryInfoCalculator calculator =
70+
new RetryInfoCalculator(Duration.ofSeconds(10), Duration.ofSeconds(20));
7171

7272
RetryInfo retryInfo1 = calculator.createRetryInfo();
7373
RetryInfo retryInfo2 = calculator.createRetryInfo();

data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,6 @@ void testHttpFullJsonWithCustomPathAndUnframedRequests() throws InvalidProtocolB
350350
.join();
351351
}
352352

353-
354353
@Test
355354
void testHttpFullJsonWithCustomPathAndAuthHeader_with_successful_response() throws InvalidProtocolBufferException {
356355
when(httpBasicAuthenticationConfig.getUsername()).thenReturn(USERNAME);
@@ -420,7 +419,7 @@ void testHttpRequestWithInvalidCredentials_with_unsuccessful_response() throws I
420419
when(httpBasicAuthenticationConfig.getUsername()).thenReturn(USERNAME);
421420
when(httpBasicAuthenticationConfig.getPassword()).thenReturn(PASSWORD);
422421
final GrpcAuthenticationProvider grpcAuthenticationProvider = new GrpcBasicAuthenticationProvider(httpBasicAuthenticationConfig);
423-
422+
424423
when(pluginFactory.loadPlugin(eq(GrpcAuthenticationProvider.class), any(PluginSetting.class)))
425424
.thenReturn(grpcAuthenticationProvider);
426425
when(oTelMetricsSourceConfig.getAuthentication()).thenReturn(new PluginModel("http_basic",
@@ -430,17 +429,17 @@ void testHttpRequestWithInvalidCredentials_with_unsuccessful_response() throws I
430429
)));
431430
when(oTelMetricsSourceConfig.enableUnframedRequests()).thenReturn(true);
432431
when(oTelMetricsSourceConfig.getPath()).thenReturn(TEST_PATH);
433-
432+
434433
configureObjectUnderTest();
435434
SOURCE.start(buffer);
436-
435+
437436
final String invalidUsername = "wrong_user";
438437
final String invalidPassword = "wrong_password";
439438
final String invalidCredentials = Base64.getEncoder()
440439
.encodeToString(String.format("%s:%s", invalidUsername, invalidPassword).getBytes(StandardCharsets.UTF_8));
441-
440+
442441
final String transformedPath = "/" + TEST_PIPELINE_NAME + "/v1/metrics";
443-
442+
444443
WebClient.of().prepare()
445444
.post("http://127.0.0.1:21891" + transformedPath)
446445
.content(MediaType.JSON_UTF_8, JsonFormat.printer().print(createExportMetricsRequest()).getBytes())
@@ -450,7 +449,7 @@ void testHttpRequestWithInvalidCredentials_with_unsuccessful_response() throws I
450449
.whenComplete((response, throwable) -> assertSecureResponseWithStatusCode(response, HttpStatus.UNAUTHORIZED, throwable))
451450
.join();
452451
}
453-
452+
454453
@Test
455454
void testGrpcRequestWithInvalidCredentials_with_unsuccessful_response() throws Exception {
456455
when(httpBasicAuthenticationConfig.getUsername()).thenReturn(USERNAME);
@@ -489,10 +488,10 @@ void testHttpWithoutSslFailsWhenSslIsEnabled() throws InvalidProtocolBufferExcep
489488
when(oTelMetricsSourceConfig.getSslKeyFile()).thenReturn("data/certificate/test_decrypted_key.key");
490489
configureObjectUnderTest();
491490
SOURCE.start(buffer);
492-
491+
493492
WebClient client = WebClient.builder("http://127.0.0.1:21891")
494493
.build();
495-
494+
496495
CompletionException exception = assertThrows(CompletionException.class, () -> client.execute(RequestHeaders.builder()
497496
.scheme(SessionProtocol.HTTP)
498497
.authority("127.0.0.1:21891")
@@ -503,28 +502,28 @@ void testHttpWithoutSslFailsWhenSslIsEnabled() throws InvalidProtocolBufferExcep
503502
HttpData.copyOf(JsonFormat.printer().print(createExportMetricsRequest()).getBytes()))
504503
.aggregate()
505504
.join());
506-
505+
507506
assertThat(exception.getCause(), instanceOf(ClosedSessionException.class));
508507
}
509-
508+
510509
@Test
511510
void testGrpcFailsIfSslIsEnabledAndNoTls() {
512511
when(oTelMetricsSourceConfig.isSsl()).thenReturn(true);
513512
when(oTelMetricsSourceConfig.getSslKeyCertChainFile()).thenReturn("data/certificate/test_cert.crt");
514513
when(oTelMetricsSourceConfig.getSslKeyFile()).thenReturn("data/certificate/test_decrypted_key.key");
515514
configureObjectUnderTest();
516515
SOURCE.start(buffer);
517-
516+
518517
MetricsServiceGrpc.MetricsServiceBlockingStub client = Clients.builder(GRPC_ENDPOINT)
519518
.build(MetricsServiceGrpc.MetricsServiceBlockingStub.class);
520-
519+
521520
StatusRuntimeException actualException = assertThrows(StatusRuntimeException.class, () -> client.export(createExportMetricsRequest()));
522-
521+
523522
assertThat(actualException.getStatus(), notNullValue());
524523
assertThat(actualException.getStatus().getCode(), equalTo(Status.Code.UNKNOWN));
525524
}
526-
527-
525+
526+
528527
@Test
529528
void testServerStartCertFileSuccess() throws IOException {
530529
try (MockedStatic<Server> armeriaServerMock = Mockito.mockStatic(Server.class)) {

data-prepper-plugins/otel-trace-source/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ dependencies {
3232
testImplementation 'org.assertj:assertj-core:3.27.3'
3333
testImplementation testLibs.slf4j.simple
3434
testImplementation libs.commons.io
35+
testImplementation 'com.jayway.jsonpath:json-path-assert:2.6.0'
3536
testImplementation 'org.skyscreamer:jsonassert:1.5.3'
3637
}
3738

0 commit comments

Comments
 (0)