diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/HttpSender.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/HttpSender.java index 704d172acd..11783eb539 100644 --- a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/HttpSender.java +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/HttpSender.java @@ -9,11 +9,10 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.concurrent.CompletableFuture; public interface HttpSender { - CompletableFuture send(BodyWriter writer, int contentLength); + Response send(BodyWriter writer, int contentLength) throws IOException; interface BodyWriter { void writeTo(OutputStream outputStream) throws IOException; diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/OkHttpSender.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/OkHttpSender.java index fd45984de7..6ecebd7f4d 100644 --- a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/OkHttpSender.java +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/connectivity/http/OkHttpSender.java @@ -8,10 +8,10 @@ import io.opentelemetry.api.internal.InstrumentationUtil; import java.io.IOException; import java.io.InputStream; -import java.util.concurrent.CompletableFuture; +import java.time.Duration; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; -import okhttp3.Call; -import okhttp3.Callback; import okhttp3.MediaType; import okhttp3.OkHttpClient; import okhttp3.Request; @@ -21,6 +21,7 @@ public final class OkHttpSender implements HttpSender { private final OkHttpClient client; private final String url; + private static final Duration REQUEST_TIMEOUT = Duration.ofSeconds(30); public static OkHttpSender create(String url) { return create(url, new OkHttpClient()); @@ -35,40 +36,38 @@ public static OkHttpSender create(String url, OkHttpClient client) { private OkHttpSender(String url, OkHttpClient client) { this.url = url; - this.client = client; + this.client = client.newBuilder().callTimeout(REQUEST_TIMEOUT).build(); } @Override - public CompletableFuture send(BodyWriter writer, int contentLength) { - CompletableFuture future = new CompletableFuture<>(); + public Response send(BodyWriter writer, int contentLength) throws IOException { Request.Builder builder = new Request.Builder().url(url); builder.addHeader("Content-Type", CONTENT_TYPE); RequestBody body = new RawRequestBody(writer, contentLength, MEDIA_TYPE); builder.post(body); + AtomicReference responseRef = new AtomicReference<>(); + AtomicReference errorRef = new AtomicReference<>(); // By suppressing instrumentations, we prevent automatic instrumentations for the okhttp request // that polls the opamp server. - InstrumentationUtil.suppressInstrumentation(() -> doSendRequest(builder.build(), future)); - - return future; + InstrumentationUtil.suppressInstrumentation( + () -> { + try { + responseRef.set(doSendRequest(builder.build())); + } catch (IOException e) { + errorRef.set(e); + } + }); + if (errorRef.get() != null) { + throw errorRef.get(); + } + return Objects.requireNonNull(responseRef.get()); } - private void doSendRequest(Request request, CompletableFuture future) { - client - .newCall(request) - .enqueue( - new Callback() { - @Override - public void onResponse(Call call, okhttp3.Response response) { - future.complete(new OkHttpResponse(response)); - } - - @Override - public void onFailure(Call call, IOException e) { - future.completeExceptionally(e); - } - }); + private Response doSendRequest(Request request) throws IOException { + okhttp3.Response response = client.newCall(request).execute(); + return new OkHttpResponse(response); } private static class OkHttpResponse implements Response { diff --git a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestService.java b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestService.java index 2976ecd04e..08b96c9079 100644 --- a/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestService.java +++ b/opamp-client/src/main/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestService.java @@ -19,13 +19,10 @@ import java.time.Duration; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -149,9 +146,8 @@ private void doSendRequest() { AgentToServer agentToServer = Objects.requireNonNull(requestSupplier).get().getAgentToServer(); byte[] data = agentToServer.encodeByteString().toByteArray(); - CompletableFuture future = - requestSender.send(outputStream -> outputStream.write(data), data.length); - try (HttpSender.Response response = future.get(30, TimeUnit.SECONDS)) { + try (HttpSender.Response response = + requestSender.send(outputStream -> outputStream.write(data), data.length)) { if (isSuccessful(response)) { ServerToAgent serverToAgent = ServerToAgent.ADAPTER.decode(response.bodyInputStream()); getCallback().onConnectionSuccess(); @@ -159,16 +155,9 @@ private void doSendRequest() { } else { handleHttpError(response); } - } catch (IOException | InterruptedException | TimeoutException e) { + } catch (IOException e) { getCallback().onConnectionFailed(e); connectionStatus.retryAfter(null); - } catch (ExecutionException e) { - if (e.getCause() != null) { - getCallback().onConnectionFailed(e.getCause()); - } else { - getCallback().onConnectionFailed(e); - } - connectionStatus.retryAfter(null); } catch (RuntimeException e) { getCallback().onRequestFailed(e); connectionStatus.retryAfter(null); diff --git a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java index 8c3f3831da..4a86ec61a9 100644 --- a/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java +++ b/opamp-client/src/test/java/io/opentelemetry/opamp/client/internal/request/service/HttpRequestServiceTest.java @@ -10,7 +10,6 @@ import static org.assertj.core.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.clearInvocations; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -26,6 +25,8 @@ import io.opentelemetry.opamp.client.request.service.RequestService; import java.io.ByteArrayInputStream; import java.io.EOFException; +import java.io.IOException; +import java.net.SocketTimeoutException; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -34,10 +35,6 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.Queue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import opamp.proto.AgentToServer; import opamp.proto.RetryInfo; import opamp.proto.ServerErrorResponse; @@ -194,12 +191,9 @@ void verifySendingRequest_whenTheresAParsingError() { } @Test - void verifySendingRequest_whenThereIsAnExecutionError() - throws ExecutionException, InterruptedException, TimeoutException { - CompletableFuture future = mock(); - requestSender.enqueueResponseFuture(future); - Exception myException = mock(); - doThrow(new ExecutionException(myException)).when(future).get(30, TimeUnit.SECONDS); + void verifySendingRequest_whenThereIsAnIOException() throws IOException { + IOException myException = mock(); + requestSender.enqueueException(myException); httpRequestService.sendRequest(); @@ -208,12 +202,9 @@ void verifySendingRequest_whenThereIsAnExecutionError() } @Test - void verifySendingRequest_whenThereIsAnInterruptedException() - throws ExecutionException, InterruptedException, TimeoutException { - CompletableFuture future = mock(); - requestSender.enqueueResponseFuture(future); - InterruptedException myException = mock(); - doThrow(myException).when(future).get(30, TimeUnit.SECONDS); + void verifySendingRequest_whenThereIsATimeoutException() throws IOException { + SocketTimeoutException myException = mock(); + requestSender.enqueueException(myException); httpRequestService.sendRequest(); @@ -406,26 +397,29 @@ private static class TestHttpSender implements HttpSender { private final List requests = new ArrayList<>(); @SuppressWarnings("JdkObsolete") - private final Queue> responses = new LinkedList<>(); + private final Queue responses = new LinkedList<>(); @Override - public CompletableFuture send(BodyWriter writer, int contentLength) { + public HttpSender.Response send(BodyWriter writer, int contentLength) throws IOException { requests.add(new RequestParams(contentLength)); - CompletableFuture response = null; + Object response = null; try { response = responses.remove(); } catch (NoSuchElementException e) { fail("Unwanted triggered request"); } - return response; + if (response instanceof IOException) { + throw (IOException) response; + } + return (HttpSender.Response) response; } void enqueueResponse(HttpSender.Response response) { - enqueueResponseFuture(CompletableFuture.completedFuture(response)); + responses.add(response); } - void enqueueResponseFuture(CompletableFuture future) { - responses.add(future); + void enqueueException(IOException exception) { + responses.add(exception); } List getRequests(int size) {