From 362a1dc88b384383c9d18aa34a17f76149f1b7b0 Mon Sep 17 00:00:00 2001 From: Jason Plumb Date: Fri, 1 May 2026 11:14:25 -0700 Subject: [PATCH 1/8] Remove CompletableFuture usage from opamp client HTTP path --- .../connectivity/http/HttpSender.java | 4 +- .../connectivity/http/OkHttpSender.java | 51 ++++++++++--------- .../request/service/HttpRequestService.java | 16 ++---- .../service/HttpRequestServiceTest.java | 43 ++++++++-------- 4 files changed, 54 insertions(+), 60 deletions(-) diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/HttpSender.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/HttpSender.java index 704d172acd..4235f14541 100644 --- a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/HttpSender.java +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/HttpSender.java @@ -9,11 +9,11 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; public interface HttpSender { - CompletableFuture send(BodyWriter writer, int contentLength); + Response send(BodyWriter writer, int contentLength) throws IOException, TimeoutException; interface BodyWriter { void writeTo(OutputStream outputStream) throws IOException; diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/OkHttpSender.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/OkHttpSender.java index fd45984de7..354c3a611b 100644 --- a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/OkHttpSender.java +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/OkHttpSender.java @@ -8,10 +8,10 @@ import io.opentelemetry.api.internal.InstrumentationUtil; import java.io.IOException; import java.io.InputStream; -import java.util.concurrent.CompletableFuture; +import java.time.Duration; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; -import okhttp3.Call; -import okhttp3.Callback; import okhttp3.MediaType; import okhttp3.OkHttpClient; import okhttp3.Request; @@ -21,6 +21,7 @@ public final class OkHttpSender implements HttpSender { private final OkHttpClient client; private final String url; + private static final Duration REQUEST_TIMEOUT = Duration.ofSeconds(30); public static OkHttpSender create(String url) { return create(url, new OkHttpClient()); @@ -39,36 +40,40 @@ private OkHttpSender(String url, OkHttpClient client) { } @Override - public CompletableFuture send(BodyWriter writer, int contentLength) { - CompletableFuture future = new CompletableFuture<>(); + public Response send(BodyWriter writer, int contentLength) throws IOException { Request.Builder builder = new Request.Builder().url(url); builder.addHeader("Content-Type", CONTENT_TYPE); RequestBody body = new RawRequestBody(writer, contentLength, MEDIA_TYPE); builder.post(body); + AtomicReference responseRef = new AtomicReference<>(); + AtomicReference errorRef = new AtomicReference<>(); // By suppressing instrumentations, we prevent automatic instrumentations for the okhttp request // that polls the opamp server. - InstrumentationUtil.suppressInstrumentation(() -> doSendRequest(builder.build(), future)); - - return future; + InstrumentationUtil.suppressInstrumentation( + () -> { + try { + responseRef.set(doSendRequest(builder.build())); + } catch (IOException e) { + errorRef.set(e); + } + }); + if (errorRef.get() != null) { + throw errorRef.get(); + } + return Objects.requireNonNull(responseRef.get()); } - private void doSendRequest(Request request, CompletableFuture future) { - client - .newCall(request) - .enqueue( - new Callback() { - @Override - public void onResponse(Call call, okhttp3.Response response) { - future.complete(new OkHttpResponse(response)); - } - - @Override - public void onFailure(Call call, IOException e) { - future.completeExceptionally(e); - } - }); + private Response doSendRequest(Request request) throws IOException { + okhttp3.Response response = + client + .newBuilder() + .callTimeout(REQUEST_TIMEOUT) + .build() + .newCall(request) + .execute(); + return new OkHttpResponse(response); } private static class OkHttpResponse implements Response { diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestService.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestService.java index 2976ecd04e..541fa4ba76 100644 --- a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestService.java +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestService.java @@ -19,8 +19,6 @@ import java.time.Duration; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -149,9 +147,8 @@ private void doSendRequest() { AgentToServer agentToServer = Objects.requireNonNull(requestSupplier).get().getAgentToServer(); byte[] data = agentToServer.encodeByteString().toByteArray(); - CompletableFuture future = - requestSender.send(outputStream -> outputStream.write(data), data.length); - try (HttpSender.Response response = future.get(30, TimeUnit.SECONDS)) { + try (HttpSender.Response response = + requestSender.send(outputStream -> outputStream.write(data), data.length)) { if (isSuccessful(response)) { ServerToAgent serverToAgent = ServerToAgent.ADAPTER.decode(response.bodyInputStream()); getCallback().onConnectionSuccess(); @@ -159,16 +156,9 @@ private void doSendRequest() { } else { handleHttpError(response); } - } catch (IOException | InterruptedException | TimeoutException e) { + } catch (IOException | TimeoutException e) { getCallback().onConnectionFailed(e); connectionStatus.retryAfter(null); - } catch (ExecutionException e) { - if (e.getCause() != null) { - getCallback().onConnectionFailed(e.getCause()); - } else { - getCallback().onConnectionFailed(e); - } - connectionStatus.retryAfter(null); } catch (RuntimeException e) { getCallback().onRequestFailed(e); connectionStatus.retryAfter(null); diff --git a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java index 8c3f3831da..325763146c 100644 --- a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java +++ b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java @@ -10,7 +10,6 @@ import static org.assertj.core.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.clearInvocations; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -26,6 +25,7 @@ import io.opentelemetry.opamp.client.request.service.RequestService; import java.io.ByteArrayInputStream; import java.io.EOFException; +import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -34,9 +34,6 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.Queue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import opamp.proto.AgentToServer; import opamp.proto.RetryInfo; @@ -195,11 +192,9 @@ void verifySendingRequest_whenTheresAParsingError() { @Test void verifySendingRequest_whenThereIsAnExecutionError() - throws ExecutionException, InterruptedException, TimeoutException { - CompletableFuture future = mock(); - requestSender.enqueueResponseFuture(future); - Exception myException = mock(); - doThrow(new ExecutionException(myException)).when(future).get(30, TimeUnit.SECONDS); + throws IOException, TimeoutException { + IOException myException = mock(); + requestSender.enqueueException(myException); httpRequestService.sendRequest(); @@ -208,12 +203,9 @@ void verifySendingRequest_whenThereIsAnExecutionError() } @Test - void verifySendingRequest_whenThereIsAnInterruptedException() - throws ExecutionException, InterruptedException, TimeoutException { - CompletableFuture future = mock(); - requestSender.enqueueResponseFuture(future); - InterruptedException myException = mock(); - doThrow(myException).when(future).get(30, TimeUnit.SECONDS); + void verifySendingRequest_whenThereIsATimeoutException() throws IOException, TimeoutException { + TimeoutException myException = mock(); + requestSender.enqueueException(myException); httpRequestService.sendRequest(); @@ -406,26 +398,33 @@ private static class TestHttpSender implements HttpSender { private final List requests = new ArrayList<>(); @SuppressWarnings("JdkObsolete") - private final Queue> responses = new LinkedList<>(); + private final Queue responses = new LinkedList<>(); @Override - public CompletableFuture send(BodyWriter writer, int contentLength) { + public HttpSender.Response send(BodyWriter writer, int contentLength) + throws IOException, TimeoutException { requests.add(new RequestParams(contentLength)); - CompletableFuture response = null; + Object response = null; try { response = responses.remove(); } catch (NoSuchElementException e) { fail("Unwanted triggered request"); } - return response; + if (response instanceof IOException) { + throw (IOException) response; + } + if (response instanceof TimeoutException) { + throw (TimeoutException) response; + } + return (HttpSender.Response) response; } void enqueueResponse(HttpSender.Response response) { - enqueueResponseFuture(CompletableFuture.completedFuture(response)); + responses.add(response); } - void enqueueResponseFuture(CompletableFuture future) { - responses.add(future); + void enqueueException(Exception exception) { + responses.add(exception); } List getRequests(int size) { From 69fdbedb2afe9a529c48cf597d74d03894578a12 Mon Sep 17 00:00:00 2001 From: Jason Plumb Date: Fri, 1 May 2026 11:16:00 -0700 Subject: [PATCH 2/8] remove exceptions --- .../client/internal/request/service/HttpRequestServiceTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java index 325763146c..a93f9b98fc 100644 --- a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java +++ b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java @@ -203,7 +203,7 @@ void verifySendingRequest_whenThereIsAnExecutionError() } @Test - void verifySendingRequest_whenThereIsATimeoutException() throws IOException, TimeoutException { + void verifySendingRequest_whenThereIsATimeoutException() { TimeoutException myException = mock(); requestSender.enqueueException(myException); From 612492383ab2c60bf2ac7060c7f23d43ab8a2f1b Mon Sep 17 00:00:00 2001 From: Jason Plumb Date: Fri, 1 May 2026 11:27:16 -0700 Subject: [PATCH 3/8] Update Gummy Bears API to 0.14.0 --- .../src/main/kotlin/otel.animalsniffer-conventions.gradle.kts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/buildSrc/src/main/kotlin/otel.animalsniffer-conventions.gradle.kts b/buildSrc/src/main/kotlin/otel.animalsniffer-conventions.gradle.kts index e4833f6d4c..60cb5ccd9a 100644 --- a/buildSrc/src/main/kotlin/otel.animalsniffer-conventions.gradle.kts +++ b/buildSrc/src/main/kotlin/otel.animalsniffer-conventions.gradle.kts @@ -6,7 +6,7 @@ plugins { } dependencies { - signature("com.toasttab.android:gummy-bears-api-23:0.13.0:coreLib@signature") + signature("com.toasttab.android:gummy-bears-api-23:0.14.0:coreLib@signature") } animalsniffer { From 924a009e5ef627f2100d78465b73bf651c35ba87 Mon Sep 17 00:00:00 2001 From: Jason Plumb Date: Fri, 1 May 2026 11:53:47 -0700 Subject: [PATCH 4/8] Apply Spotless formatting --- .../client/internal/connectivity/http/OkHttpSender.java | 7 +------ .../internal/request/service/HttpRequestServiceTest.java | 3 +-- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/OkHttpSender.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/OkHttpSender.java index 354c3a611b..00a66c3979 100644 --- a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/OkHttpSender.java +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/OkHttpSender.java @@ -67,12 +67,7 @@ public Response send(BodyWriter writer, int contentLength) throws IOException { private Response doSendRequest(Request request) throws IOException { okhttp3.Response response = - client - .newBuilder() - .callTimeout(REQUEST_TIMEOUT) - .build() - .newCall(request) - .execute(); + client.newBuilder().callTimeout(REQUEST_TIMEOUT).build().newCall(request).execute(); return new OkHttpResponse(response); } diff --git a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java index a93f9b98fc..15eaea15ef 100644 --- a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java +++ b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java @@ -191,8 +191,7 @@ void verifySendingRequest_whenTheresAParsingError() { } @Test - void verifySendingRequest_whenThereIsAnExecutionError() - throws IOException, TimeoutException { + void verifySendingRequest_whenThereIsAnExecutionError() throws IOException, TimeoutException { IOException myException = mock(); requestSender.enqueueException(myException); From c4ed12edb155f4cfeb526876a9ca4a3ee75fa6b1 Mon Sep 17 00:00:00 2001 From: Jason Plumb Date: Fri, 1 May 2026 11:58:36 -0700 Subject: [PATCH 5/8] Reuse timeout-configured OkHttp client --- .../client/internal/connectivity/http/OkHttpSender.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/OkHttpSender.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/OkHttpSender.java index 00a66c3979..6ecebd7f4d 100644 --- a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/OkHttpSender.java +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/OkHttpSender.java @@ -36,7 +36,7 @@ public static OkHttpSender create(String url, OkHttpClient client) { private OkHttpSender(String url, OkHttpClient client) { this.url = url; - this.client = client; + this.client = client.newBuilder().callTimeout(REQUEST_TIMEOUT).build(); } @Override @@ -66,8 +66,7 @@ public Response send(BodyWriter writer, int contentLength) throws IOException { } private Response doSendRequest(Request request) throws IOException { - okhttp3.Response response = - client.newBuilder().callTimeout(REQUEST_TIMEOUT).build().newCall(request).execute(); + okhttp3.Response response = client.newCall(request).execute(); return new OkHttpResponse(response); } From 878ac3e469b7f83ae03a30a6a4b5ff768cd6ce68 Mon Sep 17 00:00:00 2001 From: Jason Plumb Date: Fri, 1 May 2026 12:01:20 -0700 Subject: [PATCH 6/8] remove timeoutexception and tighten contract. --- .../internal/connectivity/http/HttpSender.java | 3 +-- .../request/service/HttpRequestService.java | 3 +-- .../request/service/HttpRequestServiceTest.java | 16 ++++++---------- 3 files changed, 8 insertions(+), 14 deletions(-) diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/HttpSender.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/HttpSender.java index 4235f14541..11783eb539 100644 --- a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/HttpSender.java +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/HttpSender.java @@ -9,11 +9,10 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.concurrent.TimeoutException; public interface HttpSender { - Response send(BodyWriter writer, int contentLength) throws IOException, TimeoutException; + Response send(BodyWriter writer, int contentLength) throws IOException; interface BodyWriter { void writeTo(OutputStream outputStream) throws IOException; diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestService.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestService.java index 541fa4ba76..08b96c9079 100644 --- a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestService.java +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestService.java @@ -23,7 +23,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -156,7 +155,7 @@ private void doSendRequest() { } else { handleHttpError(response); } - } catch (IOException | TimeoutException e) { + } catch (IOException e) { getCallback().onConnectionFailed(e); connectionStatus.retryAfter(null); } catch (RuntimeException e) { diff --git a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java index 15eaea15ef..b0ff64450e 100644 --- a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java +++ b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java @@ -26,6 +26,7 @@ import java.io.ByteArrayInputStream; import java.io.EOFException; import java.io.IOException; +import java.net.SocketTimeoutException; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -34,7 +35,6 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.Queue; -import java.util.concurrent.TimeoutException; import opamp.proto.AgentToServer; import opamp.proto.RetryInfo; import opamp.proto.ServerErrorResponse; @@ -191,7 +191,7 @@ void verifySendingRequest_whenTheresAParsingError() { } @Test - void verifySendingRequest_whenThereIsAnExecutionError() throws IOException, TimeoutException { + void verifySendingRequest_whenThereIsAnExecutionError() throws IOException { IOException myException = mock(); requestSender.enqueueException(myException); @@ -202,8 +202,8 @@ void verifySendingRequest_whenThereIsAnExecutionError() throws IOException, Time } @Test - void verifySendingRequest_whenThereIsATimeoutException() { - TimeoutException myException = mock(); + void verifySendingRequest_whenThereIsATimeoutException() throws IOException { + SocketTimeoutException myException = mock(); requestSender.enqueueException(myException); httpRequestService.sendRequest(); @@ -400,8 +400,7 @@ private static class TestHttpSender implements HttpSender { private final Queue responses = new LinkedList<>(); @Override - public HttpSender.Response send(BodyWriter writer, int contentLength) - throws IOException, TimeoutException { + public HttpSender.Response send(BodyWriter writer, int contentLength) throws IOException { requests.add(new RequestParams(contentLength)); Object response = null; try { @@ -412,9 +411,6 @@ public HttpSender.Response send(BodyWriter writer, int contentLength) if (response instanceof IOException) { throw (IOException) response; } - if (response instanceof TimeoutException) { - throw (TimeoutException) response; - } return (HttpSender.Response) response; } @@ -422,7 +418,7 @@ void enqueueResponse(HttpSender.Response response) { responses.add(response); } - void enqueueException(Exception exception) { + void enqueueException(IOException exception) { responses.add(exception); } From b86d8ad06a16e0254f0de9cf5461cdde3347b83a Mon Sep 17 00:00:00 2001 From: Jason Plumb Date: Fri, 1 May 2026 12:03:39 -0700 Subject: [PATCH 7/8] Rename IOException request test --- .../client/internal/request/service/HttpRequestServiceTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java index b0ff64450e..4a86ec61a9 100644 --- a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java +++ b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java @@ -191,7 +191,7 @@ void verifySendingRequest_whenTheresAParsingError() { } @Test - void verifySendingRequest_whenThereIsAnExecutionError() throws IOException { + void verifySendingRequest_whenThereIsAnIOException() throws IOException { IOException myException = mock(); requestSender.enqueueException(myException); From 8eb1868abcb91f5751fe4359f848990e97482205 Mon Sep 17 00:00:00 2001 From: Jason Plumb Date: Fri, 1 May 2026 12:10:47 -0700 Subject: [PATCH 8/8] Revert shared Gummy Bears upgrade --- .../src/main/kotlin/otel.animalsniffer-conventions.gradle.kts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/buildSrc/src/main/kotlin/otel.animalsniffer-conventions.gradle.kts b/buildSrc/src/main/kotlin/otel.animalsniffer-conventions.gradle.kts index 60cb5ccd9a..e4833f6d4c 100644 --- a/buildSrc/src/main/kotlin/otel.animalsniffer-conventions.gradle.kts +++ b/buildSrc/src/main/kotlin/otel.animalsniffer-conventions.gradle.kts @@ -6,7 +6,7 @@ plugins { } dependencies { - signature("com.toasttab.android:gummy-bears-api-23:0.14.0:coreLib@signature") + signature("com.toasttab.android:gummy-bears-api-23:0.13.0:coreLib@signature") } animalsniffer {