Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
2965964
Update OpenTelemetryProto to 1.3.2-alpha and refactor scope usage
Nov 5, 2024
975741f
[WIP] Process ExportTraceServiceRequest in http service
Nov 15, 2024
cfe31d9
[WIP] Move gRPC request tests to own class
Nov 29, 2024
495edf8
[WIP] Cleanup
Nov 29, 2024
6ebcd57
[WIP] Separate concerns when it comes to configuring the server/services
Dec 4, 2024
4591822
[WIP] Use retry calculator to provide backoff info
Dec 4, 2024
6957288
[WIP] Add metrics to http exception handler
Dec 13, 2024
c96eee2
[WIP] Revert accidental changes
Dec 13, 2024
2640d44
[WIP] Infer protocol from config. Isolate tests regarding unframed re…
Dec 13, 2024
2becfde
[WIP] Add basic auth to http service
Dec 17, 2024
b084fa7
[WIP] Move configuration of http service into own class
Dec 20, 2024
fa101ab
[WIP] Add pr description
Dec 20, 2024
dec354e
[WIP] Remove pr description
Jan 10, 2025
4f6312d
[WIP] Fix checkstyle findings
Jan 17, 2025
6c6fbff
[WIP] Fix issue with http service being enabled, while grpc service a…
Jan 24, 2025
59003ab
Refactor EndToEndRawSpanTest
Feb 14, 2025
f4c6fcf
Create ArmeriaAuthenticationProvider via PluginFactory
Feb 14, 2025
cf3b2e7
Rename GrpcRetryInfoCalculator
Feb 14, 2025
c35d910
Create test for invalid payload
Feb 14, 2025
19d2e99
Add test for healthcheck
Feb 14, 2025
f56079f
Add test for http exception handler
Feb 14, 2025
a342e14
Remove tests
Feb 14, 2025
6c230f3
Fix missing imports
TomasLongo Apr 11, 2025
fa2e95d
Remove unused imports
TomasLongo Apr 15, 2025
4c694f5
Fix imports
TomasLongo May 14, 2025
bfdc669
Remove/edit todos
TomasLongo May 23, 2025
702a568
Remove accidentally added default password
TomasLongo May 23, 2025
655abe4
Declare assertJ as test lib and reference it from e2e test
TomasLongo Jun 20, 2025
23766cf
Merge branch 'main' into update-otel-proto
TomasLongo Oct 27, 2025
fe1e86a
Fix merge error
TomasLongo Oct 27, 2025
39c8142
Use insecure ssl connection in test
TomasLongo Oct 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ public class GrpcRequestExceptionHandler implements GoogleGrpcExceptionHandlerFu
private final Counter badRequestsCounter;
private final Counter requestsTooLargeCounter;
private final Counter internalServerErrorCounter;
private final GrpcRetryInfoCalculator retryInfoCalculator;
private final RetryInfoCalculator retryInfoCalculator;

public GrpcRequestExceptionHandler(final PluginMetrics pluginMetrics, Duration retryInfoMinDelay, Duration retryInfoMaxDelay) {
requestTimeoutsCounter = pluginMetrics.counter(REQUEST_TIMEOUTS);
badRequestsCounter = pluginMetrics.counter(BAD_REQUESTS);
requestsTooLargeCounter = pluginMetrics.counter(REQUESTS_TOO_LARGE);
internalServerErrorCounter = pluginMetrics.counter(INTERNAL_SERVER_ERROR);
retryInfoCalculator = new GrpcRetryInfoCalculator(retryInfoMinDelay, retryInfoMaxDelay);
retryInfoCalculator = new RetryInfoCalculator(retryInfoMinDelay, retryInfoMaxDelay);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@
import java.time.Instant;
import java.util.concurrent.atomic.AtomicReference;

class GrpcRetryInfoCalculator {
public class RetryInfoCalculator {

private final Duration minimumDelay;
private final Duration maximumDelay;

private final AtomicReference<Instant> lastTimeCalled;
private final AtomicReference<Duration> nextDelay;

GrpcRetryInfoCalculator(Duration minimumDelay, Duration maximumDelay) {
public RetryInfoCalculator(Duration minimumDelay, Duration maximumDelay) {
this.minimumDelay = minimumDelay;
this.maximumDelay = maximumDelay;
// Create a cushion so that the calculator treats a first quick exception (after prepper startup) as normal request (e.g. does not calculate a backoff)
Expand All @@ -34,7 +34,7 @@ private static com.google.protobuf.Duration.Builder mapDuration(Duration duratio
return com.google.protobuf.Duration.newBuilder().setSeconds(duration.getSeconds()).setNanos(duration.getNano());
}

RetryInfo createRetryInfo() {
public RetryInfo createRetryInfo() {
Instant now = Instant.now();
// Is the last time we got called longer ago than the next delay?
if (lastTimeCalled.getAndSet(now).isBefore(now.minus(nextDelay.get()))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@ public class GrpcRetryInfoCalculatorTest {

@Test
public void testMinimumDelayOnFirstCall() {
RetryInfo retryInfo = new GrpcRetryInfoCalculator(Duration.ofMillis(100), Duration.ofSeconds(1)).createRetryInfo();
RetryInfo retryInfo = new RetryInfoCalculator(Duration.ofMillis(100), Duration.ofSeconds(1)).createRetryInfo();

assertThat(retryInfo.getRetryDelay().getNanos(), equalTo(100_000_000));
assertThat(retryInfo.getRetryDelay().getSeconds(), equalTo(0L));
}

@Test
public void testExponentialBackoff() {
GrpcRetryInfoCalculator calculator =
new GrpcRetryInfoCalculator(Duration.ofSeconds(1), Duration.ofSeconds(10));
RetryInfoCalculator calculator =
new RetryInfoCalculator(Duration.ofSeconds(1), Duration.ofSeconds(10));
RetryInfo retryInfo1 = calculator.createRetryInfo();
RetryInfo retryInfo2 = calculator.createRetryInfo();
RetryInfo retryInfo3 = calculator.createRetryInfo();
Expand All @@ -35,8 +35,8 @@ public void testExponentialBackoff() {

@Test
public void testUsesMaximumAsLongestDelay() {
GrpcRetryInfoCalculator calculator =
new GrpcRetryInfoCalculator(Duration.ofSeconds(1), Duration.ofSeconds(2));
RetryInfoCalculator calculator =
new RetryInfoCalculator(Duration.ofSeconds(1), Duration.ofSeconds(2));
RetryInfo retryInfo1 = calculator.createRetryInfo();
RetryInfo retryInfo2 = calculator.createRetryInfo();
RetryInfo retryInfo3 = calculator.createRetryInfo();
Expand All @@ -49,8 +49,8 @@ public void testUsesMaximumAsLongestDelay() {
@Test
public void testResetAfterDelayWearsOff() throws InterruptedException {
int minDelayNanos = 1_000_000;
GrpcRetryInfoCalculator calculator =
new GrpcRetryInfoCalculator(Duration.ofNanos(minDelayNanos), Duration.ofSeconds(1));
RetryInfoCalculator calculator =
new RetryInfoCalculator(Duration.ofNanos(minDelayNanos), Duration.ofSeconds(1));

RetryInfo retryInfo1 = calculator.createRetryInfo();
RetryInfo retryInfo2 = calculator.createRetryInfo();
Expand All @@ -66,8 +66,8 @@ public void testResetAfterDelayWearsOff() throws InterruptedException {

@Test
public void testQuickFirstExceptionDoesNotTriggerBackoffCalculationEvenWithLongMinDelay() throws InterruptedException {
GrpcRetryInfoCalculator calculator =
new GrpcRetryInfoCalculator(Duration.ofSeconds(10), Duration.ofSeconds(20));
RetryInfoCalculator calculator =
new RetryInfoCalculator(Duration.ofSeconds(10), Duration.ofSeconds(20));

RetryInfo retryInfo1 = calculator.createRetryInfo();
RetryInfo retryInfo2 = calculator.createRetryInfo();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,6 @@ void testHttpFullJsonWithCustomPathAndUnframedRequests() throws InvalidProtocolB
.join();
}


@Test
void testHttpFullJsonWithCustomPathAndAuthHeader_with_successful_response() throws InvalidProtocolBufferException {
when(httpBasicAuthenticationConfig.getUsername()).thenReturn(USERNAME);
Expand Down Expand Up @@ -420,7 +419,7 @@ void testHttpRequestWithInvalidCredentials_with_unsuccessful_response() throws I
when(httpBasicAuthenticationConfig.getUsername()).thenReturn(USERNAME);
when(httpBasicAuthenticationConfig.getPassword()).thenReturn(PASSWORD);
final GrpcAuthenticationProvider grpcAuthenticationProvider = new GrpcBasicAuthenticationProvider(httpBasicAuthenticationConfig);

when(pluginFactory.loadPlugin(eq(GrpcAuthenticationProvider.class), any(PluginSetting.class)))
.thenReturn(grpcAuthenticationProvider);
when(oTelMetricsSourceConfig.getAuthentication()).thenReturn(new PluginModel("http_basic",
Expand All @@ -430,17 +429,17 @@ void testHttpRequestWithInvalidCredentials_with_unsuccessful_response() throws I
)));
when(oTelMetricsSourceConfig.enableUnframedRequests()).thenReturn(true);
when(oTelMetricsSourceConfig.getPath()).thenReturn(TEST_PATH);

configureObjectUnderTest();
SOURCE.start(buffer);

final String invalidUsername = "wrong_user";
final String invalidPassword = "wrong_password";
final String invalidCredentials = Base64.getEncoder()
.encodeToString(String.format("%s:%s", invalidUsername, invalidPassword).getBytes(StandardCharsets.UTF_8));

final String transformedPath = "/" + TEST_PIPELINE_NAME + "/v1/metrics";

WebClient.of().prepare()
.post("http://127.0.0.1:21891" + transformedPath)
.content(MediaType.JSON_UTF_8, JsonFormat.printer().print(createExportMetricsRequest()).getBytes())
Expand All @@ -450,7 +449,7 @@ void testHttpRequestWithInvalidCredentials_with_unsuccessful_response() throws I
.whenComplete((response, throwable) -> assertSecureResponseWithStatusCode(response, HttpStatus.UNAUTHORIZED, throwable))
.join();
}

@Test
void testGrpcRequestWithInvalidCredentials_with_unsuccessful_response() throws Exception {
when(httpBasicAuthenticationConfig.getUsername()).thenReturn(USERNAME);
Expand Down Expand Up @@ -489,10 +488,10 @@ void testHttpWithoutSslFailsWhenSslIsEnabled() throws InvalidProtocolBufferExcep
when(oTelMetricsSourceConfig.getSslKeyFile()).thenReturn("data/certificate/test_decrypted_key.key");
configureObjectUnderTest();
SOURCE.start(buffer);

WebClient client = WebClient.builder("http://127.0.0.1:21891")
.build();

CompletionException exception = assertThrows(CompletionException.class, () -> client.execute(RequestHeaders.builder()
.scheme(SessionProtocol.HTTP)
.authority("127.0.0.1:21891")
Expand All @@ -503,28 +502,28 @@ void testHttpWithoutSslFailsWhenSslIsEnabled() throws InvalidProtocolBufferExcep
HttpData.copyOf(JsonFormat.printer().print(createExportMetricsRequest()).getBytes()))
.aggregate()
.join());

assertThat(exception.getCause(), instanceOf(ClosedSessionException.class));
}

@Test
void testGrpcFailsIfSslIsEnabledAndNoTls() {
when(oTelMetricsSourceConfig.isSsl()).thenReturn(true);
when(oTelMetricsSourceConfig.getSslKeyCertChainFile()).thenReturn("data/certificate/test_cert.crt");
when(oTelMetricsSourceConfig.getSslKeyFile()).thenReturn("data/certificate/test_decrypted_key.key");
configureObjectUnderTest();
SOURCE.start(buffer);

MetricsServiceGrpc.MetricsServiceBlockingStub client = Clients.builder(GRPC_ENDPOINT)
.build(MetricsServiceGrpc.MetricsServiceBlockingStub.class);

StatusRuntimeException actualException = assertThrows(StatusRuntimeException.class, () -> client.export(createExportMetricsRequest()));

assertThat(actualException.getStatus(), notNullValue());
assertThat(actualException.getStatus().getCode(), equalTo(Status.Code.UNKNOWN));
}


@Test
void testServerStartCertFileSuccess() throws IOException {
try (MockedStatic<Server> armeriaServerMock = Mockito.mockStatic(Server.class)) {
Expand Down
1 change: 1 addition & 0 deletions data-prepper-plugins/otel-trace-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ dependencies {
testImplementation 'org.assertj:assertj-core:3.27.3'
testImplementation testLibs.slf4j.simple
testImplementation libs.commons.io
testImplementation 'com.jayway.jsonpath:json-path-assert:2.6.0'
testImplementation 'org.skyscreamer:jsonassert:1.5.3'
}

Expand Down
Loading
Loading